git_store.rs

   1pub mod branch_diff;
   2mod conflict_set;
   3pub mod git_traversal;
   4pub mod pending_op;
   5
   6use crate::{
   7    ProjectEnvironment, ProjectItem, ProjectPath,
   8    buffer_store::{BufferStore, BufferStoreEvent},
   9    worktree_store::{WorktreeStore, WorktreeStoreEvent},
  10};
  11use anyhow::{Context as _, Result, anyhow, bail};
  12use askpass::{AskPassDelegate, EncryptedPassword, IKnowWhatIAmDoingAndIHaveReadTheDocs};
  13use buffer_diff::{BufferDiff, BufferDiffEvent};
  14use client::ProjectId;
  15use collections::HashMap;
  16pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, ConflictSetUpdate};
  17use fs::Fs;
  18use futures::{
  19    FutureExt, StreamExt,
  20    channel::{
  21        mpsc,
  22        oneshot::{self, Canceled},
  23    },
  24    future::{self, Shared},
  25    stream::FuturesOrdered,
  26};
  27use git::{
  28    BuildPermalinkParams, GitHostingProviderRegistry, Oid, RunHook,
  29    blame::Blame,
  30    parse_git_remote_url,
  31    repository::{
  32        Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, DiffType, FetchOptions,
  33        GitRepository, GitRepositoryCheckpoint, GraphCommitData, InitialGraphCommitData, LogOrder,
  34        LogSource, PushOptions, Remote, RemoteCommandOutput, RepoPath, ResetMode,
  35        UpstreamTrackingStatus, Worktree as GitWorktree,
  36    },
  37    stash::{GitStash, StashEntry},
  38    status::{
  39        DiffTreeType, FileStatus, GitSummary, StatusCode, TrackedStatus, TreeDiff, TreeDiffStatus,
  40        UnmergedStatus, UnmergedStatusCode,
  41    },
  42};
  43use gpui::{
  44    App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
  45    WeakEntity,
  46};
  47use language::{
  48    Buffer, BufferEvent, Language, LanguageRegistry,
  49    proto::{deserialize_version, serialize_version},
  50};
  51use parking_lot::Mutex;
  52use pending_op::{PendingOp, PendingOpId, PendingOps, PendingOpsSummary};
  53use postage::stream::Stream as _;
  54use rpc::{
  55    AnyProtoClient, TypedEnvelope,
  56    proto::{self, git_reset, split_repository_update},
  57};
  58use serde::Deserialize;
  59use settings::WorktreeId;
  60use smol::future::yield_now;
  61use std::{
  62    cmp::Ordering,
  63    collections::{BTreeSet, HashSet, VecDeque},
  64    future::Future,
  65    mem,
  66    ops::Range,
  67    path::{Path, PathBuf},
  68    str::FromStr,
  69    sync::{
  70        Arc,
  71        atomic::{self, AtomicU64},
  72    },
  73    time::Instant,
  74};
  75use sum_tree::{Edit, SumTree, TreeSet};
  76use task::Shell;
  77use text::{Bias, BufferId};
  78use util::{
  79    ResultExt, debug_panic,
  80    paths::{PathStyle, SanitizedPath},
  81    post_inc,
  82    rel_path::RelPath,
  83};
  84use worktree::{
  85    File, PathChange, PathKey, PathProgress, PathSummary, PathTarget, ProjectEntryId,
  86    UpdatedGitRepositoriesSet, UpdatedGitRepository, Worktree,
  87};
  88use zeroize::Zeroize;
  89
  90pub struct GitStore {
  91    state: GitStoreState,
  92    buffer_store: Entity<BufferStore>,
  93    worktree_store: Entity<WorktreeStore>,
  94    repositories: HashMap<RepositoryId, Entity<Repository>>,
  95    worktree_ids: HashMap<RepositoryId, HashSet<WorktreeId>>,
  96    active_repo_id: Option<RepositoryId>,
  97    #[allow(clippy::type_complexity)]
  98    loading_diffs:
  99        HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
 100    diffs: HashMap<BufferId, Entity<BufferGitState>>,
 101    shared_diffs: HashMap<proto::PeerId, HashMap<BufferId, SharedDiffs>>,
 102    _subscriptions: Vec<Subscription>,
 103}
 104
 105#[derive(Default)]
 106struct SharedDiffs {
 107    unstaged: Option<Entity<BufferDiff>>,
 108    uncommitted: Option<Entity<BufferDiff>>,
 109}
 110
 111struct BufferGitState {
 112    unstaged_diff: Option<WeakEntity<BufferDiff>>,
 113    uncommitted_diff: Option<WeakEntity<BufferDiff>>,
 114    conflict_set: Option<WeakEntity<ConflictSet>>,
 115    recalculate_diff_task: Option<Task<Result<()>>>,
 116    reparse_conflict_markers_task: Option<Task<Result<()>>>,
 117    language: Option<Arc<Language>>,
 118    language_registry: Option<Arc<LanguageRegistry>>,
 119    conflict_updated_futures: Vec<oneshot::Sender<()>>,
 120    recalculating_tx: postage::watch::Sender<bool>,
 121
 122    /// These operation counts are used to ensure that head and index text
 123    /// values read from the git repository are up-to-date with any hunk staging
 124    /// operations that have been performed on the BufferDiff.
 125    ///
 126    /// The operation count is incremented immediately when the user initiates a
 127    /// hunk stage/unstage operation. Then, upon finishing writing the new index
 128    /// text do disk, the `operation count as of write` is updated to reflect
 129    /// the operation count that prompted the write.
 130    hunk_staging_operation_count: usize,
 131    hunk_staging_operation_count_as_of_write: usize,
 132
 133    head_text: Option<Arc<str>>,
 134    index_text: Option<Arc<str>>,
 135    head_changed: bool,
 136    index_changed: bool,
 137    language_changed: bool,
 138}
 139
 140#[derive(Clone, Debug)]
 141enum DiffBasesChange {
 142    SetIndex(Option<String>),
 143    SetHead(Option<String>),
 144    SetEach {
 145        index: Option<String>,
 146        head: Option<String>,
 147    },
 148    SetBoth(Option<String>),
 149}
 150
 151#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
 152enum DiffKind {
 153    Unstaged,
 154    Uncommitted,
 155}
 156
 157enum GitStoreState {
 158    Local {
 159        next_repository_id: Arc<AtomicU64>,
 160        downstream: Option<LocalDownstreamState>,
 161        project_environment: Entity<ProjectEnvironment>,
 162        fs: Arc<dyn Fs>,
 163    },
 164    Remote {
 165        upstream_client: AnyProtoClient,
 166        upstream_project_id: u64,
 167        downstream: Option<(AnyProtoClient, ProjectId)>,
 168    },
 169}
 170
 171enum DownstreamUpdate {
 172    UpdateRepository(RepositorySnapshot),
 173    RemoveRepository(RepositoryId),
 174}
 175
 176struct LocalDownstreamState {
 177    client: AnyProtoClient,
 178    project_id: ProjectId,
 179    updates_tx: mpsc::UnboundedSender<DownstreamUpdate>,
 180    _task: Task<Result<()>>,
 181}
 182
 183#[derive(Clone, Debug)]
 184pub struct GitStoreCheckpoint {
 185    checkpoints_by_work_dir_abs_path: HashMap<Arc<Path>, GitRepositoryCheckpoint>,
 186}
 187
 188#[derive(Clone, Debug, PartialEq, Eq)]
 189pub struct StatusEntry {
 190    pub repo_path: RepoPath,
 191    pub status: FileStatus,
 192}
 193
 194impl StatusEntry {
 195    fn to_proto(&self) -> proto::StatusEntry {
 196        let simple_status = match self.status {
 197            FileStatus::Ignored | FileStatus::Untracked => proto::GitStatus::Added as i32,
 198            FileStatus::Unmerged { .. } => proto::GitStatus::Conflict as i32,
 199            FileStatus::Tracked(TrackedStatus {
 200                index_status,
 201                worktree_status,
 202            }) => tracked_status_to_proto(if worktree_status != StatusCode::Unmodified {
 203                worktree_status
 204            } else {
 205                index_status
 206            }),
 207        };
 208
 209        proto::StatusEntry {
 210            repo_path: self.repo_path.to_proto(),
 211            simple_status,
 212            status: Some(status_to_proto(self.status)),
 213        }
 214    }
 215}
 216
 217impl TryFrom<proto::StatusEntry> for StatusEntry {
 218    type Error = anyhow::Error;
 219
 220    fn try_from(value: proto::StatusEntry) -> Result<Self, Self::Error> {
 221        let repo_path = RepoPath::from_proto(&value.repo_path).context("invalid repo path")?;
 222        let status = status_from_proto(value.simple_status, value.status)?;
 223        Ok(Self { repo_path, status })
 224    }
 225}
 226
 227impl sum_tree::Item for StatusEntry {
 228    type Summary = PathSummary<GitSummary>;
 229
 230    fn summary(&self, _: <Self::Summary as sum_tree::Summary>::Context<'_>) -> Self::Summary {
 231        PathSummary {
 232            max_path: self.repo_path.as_ref().clone(),
 233            item_summary: self.status.summary(),
 234        }
 235    }
 236}
 237
 238impl sum_tree::KeyedItem for StatusEntry {
 239    type Key = PathKey;
 240
 241    fn key(&self) -> Self::Key {
 242        PathKey(self.repo_path.as_ref().clone())
 243    }
 244}
 245
 246#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
 247pub struct RepositoryId(pub u64);
 248
 249#[derive(Clone, Debug, Default, PartialEq, Eq)]
 250pub struct MergeDetails {
 251    pub conflicted_paths: TreeSet<RepoPath>,
 252    pub message: Option<SharedString>,
 253    pub heads: Vec<Option<SharedString>>,
 254}
 255
 256#[derive(Clone)]
 257pub enum CommitDataState {
 258    Loading,
 259    Loaded(Arc<GraphCommitData>),
 260}
 261
 262#[derive(Clone, Debug, PartialEq, Eq)]
 263pub struct RepositorySnapshot {
 264    pub id: RepositoryId,
 265    pub statuses_by_path: SumTree<StatusEntry>,
 266    pub work_directory_abs_path: Arc<Path>,
 267    pub path_style: PathStyle,
 268    pub branch: Option<Branch>,
 269    pub head_commit: Option<CommitDetails>,
 270    pub scan_id: u64,
 271    pub merge: MergeDetails,
 272    pub remote_origin_url: Option<String>,
 273    pub remote_upstream_url: Option<String>,
 274    pub stash_entries: GitStash,
 275}
 276
 277type JobId = u64;
 278
 279#[derive(Clone, Debug, PartialEq, Eq)]
 280pub struct JobInfo {
 281    pub start: Instant,
 282    pub message: SharedString,
 283}
 284
 285struct GraphCommitDataHandler {
 286    _task: Task<()>,
 287    commit_data_request: smol::channel::Sender<Oid>,
 288}
 289
 290enum GraphCommitHandlerState {
 291    Starting,
 292    Open(GraphCommitDataHandler),
 293    Closed,
 294}
 295
 296pub struct Repository {
 297    this: WeakEntity<Self>,
 298    snapshot: RepositorySnapshot,
 299    commit_message_buffer: Option<Entity<Buffer>>,
 300    git_store: WeakEntity<GitStore>,
 301    // For a local repository, holds paths that have had worktree events since the last status scan completed,
 302    // and that should be examined during the next status scan.
 303    paths_needing_status_update: BTreeSet<RepoPath>,
 304    job_sender: mpsc::UnboundedSender<GitJob>,
 305    active_jobs: HashMap<JobId, JobInfo>,
 306    pending_ops: SumTree<PendingOps>,
 307    job_id: JobId,
 308    askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
 309    latest_askpass_id: u64,
 310    repository_state: Shared<Task<Result<RepositoryState, String>>>,
 311    pub initial_graph_data: HashMap<
 312        (LogOrder, LogSource),
 313        (
 314            Task<Result<(), SharedString>>,
 315            Vec<Arc<InitialGraphCommitData>>,
 316        ),
 317    >,
 318    graph_commit_data_handler: GraphCommitHandlerState,
 319    commit_data: HashMap<Oid, CommitDataState>,
 320}
 321
 322impl std::ops::Deref for Repository {
 323    type Target = RepositorySnapshot;
 324
 325    fn deref(&self) -> &Self::Target {
 326        &self.snapshot
 327    }
 328}
 329
 330#[derive(Clone)]
 331pub struct LocalRepositoryState {
 332    pub fs: Arc<dyn Fs>,
 333    pub backend: Arc<dyn GitRepository>,
 334    pub environment: Arc<HashMap<String, String>>,
 335}
 336
 337impl LocalRepositoryState {
 338    async fn new(
 339        work_directory_abs_path: Arc<Path>,
 340        dot_git_abs_path: Arc<Path>,
 341        project_environment: WeakEntity<ProjectEnvironment>,
 342        fs: Arc<dyn Fs>,
 343        cx: &mut AsyncApp,
 344    ) -> anyhow::Result<Self> {
 345        let environment = project_environment
 346                .update(cx, |project_environment, cx| {
 347                    project_environment.local_directory_environment(&Shell::System, work_directory_abs_path.clone(), cx)
 348                })?
 349                .await
 350                .unwrap_or_else(|| {
 351                    log::error!("failed to get working directory environment for repository {work_directory_abs_path:?}");
 352                    HashMap::default()
 353                });
 354        let search_paths = environment.get("PATH").map(|val| val.to_owned());
 355        let backend = cx
 356            .background_spawn({
 357                let fs = fs.clone();
 358                async move {
 359                    let system_git_binary_path = search_paths
 360                        .and_then(|search_paths| {
 361                            which::which_in("git", Some(search_paths), &work_directory_abs_path)
 362                                .ok()
 363                        })
 364                        .or_else(|| which::which("git").ok());
 365                    fs.open_repo(&dot_git_abs_path, system_git_binary_path.as_deref())
 366                        .with_context(|| format!("opening repository at {dot_git_abs_path:?}"))
 367                }
 368            })
 369            .await?;
 370        Ok(LocalRepositoryState {
 371            backend,
 372            environment: Arc::new(environment),
 373            fs,
 374        })
 375    }
 376}
 377
 378#[derive(Clone)]
 379pub struct RemoteRepositoryState {
 380    pub project_id: ProjectId,
 381    pub client: AnyProtoClient,
 382}
 383
 384#[derive(Clone)]
 385pub enum RepositoryState {
 386    Local(LocalRepositoryState),
 387    Remote(RemoteRepositoryState),
 388}
 389
 390#[derive(Clone, Debug, PartialEq, Eq)]
 391pub enum RepositoryEvent {
 392    StatusesChanged,
 393    MergeHeadsChanged,
 394    BranchChanged,
 395    StashEntriesChanged,
 396    PendingOpsChanged { pending_ops: SumTree<PendingOps> },
 397    GitGraphCountUpdated((LogOrder, LogSource), usize),
 398}
 399
 400#[derive(Clone, Debug)]
 401pub struct JobsUpdated;
 402
 403#[derive(Debug)]
 404pub enum GitStoreEvent {
 405    ActiveRepositoryChanged(Option<RepositoryId>),
 406    /// Bool is true when the repository that's updated is the active repository
 407    RepositoryUpdated(RepositoryId, RepositoryEvent, bool),
 408    RepositoryAdded,
 409    RepositoryRemoved(RepositoryId),
 410    IndexWriteError(anyhow::Error),
 411    JobsUpdated,
 412    ConflictsUpdated,
 413}
 414
 415impl EventEmitter<RepositoryEvent> for Repository {}
 416impl EventEmitter<JobsUpdated> for Repository {}
 417impl EventEmitter<GitStoreEvent> for GitStore {}
 418
 419pub struct GitJob {
 420    job: Box<dyn FnOnce(RepositoryState, &mut AsyncApp) -> Task<()>>,
 421    key: Option<GitJobKey>,
 422}
 423
 424#[derive(PartialEq, Eq)]
 425enum GitJobKey {
 426    WriteIndex(Vec<RepoPath>),
 427    ReloadBufferDiffBases,
 428    RefreshStatuses,
 429    ReloadGitState,
 430}
 431
 432impl GitStore {
 433    pub fn local(
 434        worktree_store: &Entity<WorktreeStore>,
 435        buffer_store: Entity<BufferStore>,
 436        environment: Entity<ProjectEnvironment>,
 437        fs: Arc<dyn Fs>,
 438        cx: &mut Context<Self>,
 439    ) -> Self {
 440        Self::new(
 441            worktree_store.clone(),
 442            buffer_store,
 443            GitStoreState::Local {
 444                next_repository_id: Arc::new(AtomicU64::new(1)),
 445                downstream: None,
 446                project_environment: environment,
 447                fs,
 448            },
 449            cx,
 450        )
 451    }
 452
 453    pub fn remote(
 454        worktree_store: &Entity<WorktreeStore>,
 455        buffer_store: Entity<BufferStore>,
 456        upstream_client: AnyProtoClient,
 457        project_id: u64,
 458        cx: &mut Context<Self>,
 459    ) -> Self {
 460        Self::new(
 461            worktree_store.clone(),
 462            buffer_store,
 463            GitStoreState::Remote {
 464                upstream_client,
 465                upstream_project_id: project_id,
 466                downstream: None,
 467            },
 468            cx,
 469        )
 470    }
 471
 472    fn new(
 473        worktree_store: Entity<WorktreeStore>,
 474        buffer_store: Entity<BufferStore>,
 475        state: GitStoreState,
 476        cx: &mut Context<Self>,
 477    ) -> Self {
 478        let _subscriptions = vec![
 479            cx.subscribe(&worktree_store, Self::on_worktree_store_event),
 480            cx.subscribe(&buffer_store, Self::on_buffer_store_event),
 481        ];
 482
 483        GitStore {
 484            state,
 485            buffer_store,
 486            worktree_store,
 487            repositories: HashMap::default(),
 488            worktree_ids: HashMap::default(),
 489            active_repo_id: None,
 490            _subscriptions,
 491            loading_diffs: HashMap::default(),
 492            shared_diffs: HashMap::default(),
 493            diffs: HashMap::default(),
 494        }
 495    }
 496
 497    pub fn init(client: &AnyProtoClient) {
 498        client.add_entity_request_handler(Self::handle_get_remotes);
 499        client.add_entity_request_handler(Self::handle_get_branches);
 500        client.add_entity_request_handler(Self::handle_get_default_branch);
 501        client.add_entity_request_handler(Self::handle_change_branch);
 502        client.add_entity_request_handler(Self::handle_create_branch);
 503        client.add_entity_request_handler(Self::handle_rename_branch);
 504        client.add_entity_request_handler(Self::handle_create_remote);
 505        client.add_entity_request_handler(Self::handle_remove_remote);
 506        client.add_entity_request_handler(Self::handle_delete_branch);
 507        client.add_entity_request_handler(Self::handle_git_init);
 508        client.add_entity_request_handler(Self::handle_push);
 509        client.add_entity_request_handler(Self::handle_pull);
 510        client.add_entity_request_handler(Self::handle_fetch);
 511        client.add_entity_request_handler(Self::handle_stage);
 512        client.add_entity_request_handler(Self::handle_unstage);
 513        client.add_entity_request_handler(Self::handle_stash);
 514        client.add_entity_request_handler(Self::handle_stash_pop);
 515        client.add_entity_request_handler(Self::handle_stash_apply);
 516        client.add_entity_request_handler(Self::handle_stash_drop);
 517        client.add_entity_request_handler(Self::handle_commit);
 518        client.add_entity_request_handler(Self::handle_run_hook);
 519        client.add_entity_request_handler(Self::handle_reset);
 520        client.add_entity_request_handler(Self::handle_show);
 521        client.add_entity_request_handler(Self::handle_load_commit_diff);
 522        client.add_entity_request_handler(Self::handle_file_history);
 523        client.add_entity_request_handler(Self::handle_checkout_files);
 524        client.add_entity_request_handler(Self::handle_open_commit_message_buffer);
 525        client.add_entity_request_handler(Self::handle_set_index_text);
 526        client.add_entity_request_handler(Self::handle_askpass);
 527        client.add_entity_request_handler(Self::handle_check_for_pushed_commits);
 528        client.add_entity_request_handler(Self::handle_git_diff);
 529        client.add_entity_request_handler(Self::handle_tree_diff);
 530        client.add_entity_request_handler(Self::handle_get_blob_content);
 531        client.add_entity_request_handler(Self::handle_open_unstaged_diff);
 532        client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
 533        client.add_entity_message_handler(Self::handle_update_diff_bases);
 534        client.add_entity_request_handler(Self::handle_get_permalink_to_line);
 535        client.add_entity_request_handler(Self::handle_blame_buffer);
 536        client.add_entity_message_handler(Self::handle_update_repository);
 537        client.add_entity_message_handler(Self::handle_remove_repository);
 538        client.add_entity_request_handler(Self::handle_git_clone);
 539        client.add_entity_request_handler(Self::handle_get_worktrees);
 540        client.add_entity_request_handler(Self::handle_create_worktree);
 541    }
 542
 543    pub fn is_local(&self) -> bool {
 544        matches!(self.state, GitStoreState::Local { .. })
 545    }
 546    pub fn set_active_repo_for_path(&mut self, project_path: &ProjectPath, cx: &mut Context<Self>) {
 547        if let Some((repo, _)) = self.repository_and_path_for_project_path(project_path, cx) {
 548            let id = repo.read(cx).id;
 549            if self.active_repo_id != Some(id) {
 550                self.active_repo_id = Some(id);
 551                cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
 552            }
 553        }
 554    }
 555
 556    pub fn shared(&mut self, project_id: u64, client: AnyProtoClient, cx: &mut Context<Self>) {
 557        match &mut self.state {
 558            GitStoreState::Remote {
 559                downstream: downstream_client,
 560                ..
 561            } => {
 562                for repo in self.repositories.values() {
 563                    let update = repo.read(cx).snapshot.initial_update(project_id);
 564                    for update in split_repository_update(update) {
 565                        client.send(update).log_err();
 566                    }
 567                }
 568                *downstream_client = Some((client, ProjectId(project_id)));
 569            }
 570            GitStoreState::Local {
 571                downstream: downstream_client,
 572                ..
 573            } => {
 574                let mut snapshots = HashMap::default();
 575                let (updates_tx, mut updates_rx) = mpsc::unbounded();
 576                for repo in self.repositories.values() {
 577                    updates_tx
 578                        .unbounded_send(DownstreamUpdate::UpdateRepository(
 579                            repo.read(cx).snapshot.clone(),
 580                        ))
 581                        .ok();
 582                }
 583                *downstream_client = Some(LocalDownstreamState {
 584                    client: client.clone(),
 585                    project_id: ProjectId(project_id),
 586                    updates_tx,
 587                    _task: cx.spawn(async move |this, cx| {
 588                        cx.background_spawn(async move {
 589                            while let Some(update) = updates_rx.next().await {
 590                                match update {
 591                                    DownstreamUpdate::UpdateRepository(snapshot) => {
 592                                        if let Some(old_snapshot) = snapshots.get_mut(&snapshot.id)
 593                                        {
 594                                            let update =
 595                                                snapshot.build_update(old_snapshot, project_id);
 596                                            *old_snapshot = snapshot;
 597                                            for update in split_repository_update(update) {
 598                                                client.send(update)?;
 599                                            }
 600                                        } else {
 601                                            let update = snapshot.initial_update(project_id);
 602                                            for update in split_repository_update(update) {
 603                                                client.send(update)?;
 604                                            }
 605                                            snapshots.insert(snapshot.id, snapshot);
 606                                        }
 607                                    }
 608                                    DownstreamUpdate::RemoveRepository(id) => {
 609                                        client.send(proto::RemoveRepository {
 610                                            project_id,
 611                                            id: id.to_proto(),
 612                                        })?;
 613                                    }
 614                                }
 615                            }
 616                            anyhow::Ok(())
 617                        })
 618                        .await
 619                        .ok();
 620                        this.update(cx, |this, _| {
 621                            if let GitStoreState::Local {
 622                                downstream: downstream_client,
 623                                ..
 624                            } = &mut this.state
 625                            {
 626                                downstream_client.take();
 627                            } else {
 628                                unreachable!("unshared called on remote store");
 629                            }
 630                        })
 631                    }),
 632                });
 633            }
 634        }
 635    }
 636
 637    pub fn unshared(&mut self, _cx: &mut Context<Self>) {
 638        match &mut self.state {
 639            GitStoreState::Local {
 640                downstream: downstream_client,
 641                ..
 642            } => {
 643                downstream_client.take();
 644            }
 645            GitStoreState::Remote {
 646                downstream: downstream_client,
 647                ..
 648            } => {
 649                downstream_client.take();
 650            }
 651        }
 652        self.shared_diffs.clear();
 653    }
 654
 655    pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) {
 656        self.shared_diffs.remove(peer_id);
 657    }
 658
 659    pub fn active_repository(&self) -> Option<Entity<Repository>> {
 660        self.active_repo_id
 661            .as_ref()
 662            .map(|id| self.repositories[id].clone())
 663    }
 664
 665    pub fn open_unstaged_diff(
 666        &mut self,
 667        buffer: Entity<Buffer>,
 668        cx: &mut Context<Self>,
 669    ) -> Task<Result<Entity<BufferDiff>>> {
 670        let buffer_id = buffer.read(cx).remote_id();
 671        if let Some(diff_state) = self.diffs.get(&buffer_id)
 672            && let Some(unstaged_diff) = diff_state
 673                .read(cx)
 674                .unstaged_diff
 675                .as_ref()
 676                .and_then(|weak| weak.upgrade())
 677        {
 678            if let Some(task) =
 679                diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
 680            {
 681                return cx.background_executor().spawn(async move {
 682                    task.await;
 683                    Ok(unstaged_diff)
 684                });
 685            }
 686            return Task::ready(Ok(unstaged_diff));
 687        }
 688
 689        let Some((repo, repo_path)) =
 690            self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
 691        else {
 692            return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
 693        };
 694
 695        let task = self
 696            .loading_diffs
 697            .entry((buffer_id, DiffKind::Unstaged))
 698            .or_insert_with(|| {
 699                let staged_text = repo.update(cx, |repo, cx| {
 700                    repo.load_staged_text(buffer_id, repo_path, cx)
 701                });
 702                cx.spawn(async move |this, cx| {
 703                    Self::open_diff_internal(
 704                        this,
 705                        DiffKind::Unstaged,
 706                        staged_text.await.map(DiffBasesChange::SetIndex),
 707                        buffer,
 708                        cx,
 709                    )
 710                    .await
 711                    .map_err(Arc::new)
 712                })
 713                .shared()
 714            })
 715            .clone();
 716
 717        cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
 718    }
 719
 720    pub fn open_diff_since(
 721        &mut self,
 722        oid: Option<git::Oid>,
 723        buffer: Entity<Buffer>,
 724        repo: Entity<Repository>,
 725        cx: &mut Context<Self>,
 726    ) -> Task<Result<Entity<BufferDiff>>> {
 727        cx.spawn(async move |this, cx| {
 728            let buffer_snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
 729            let language_registry = buffer.update(cx, |buffer, _| buffer.language_registry());
 730            let content = match oid {
 731                None => None,
 732                Some(oid) => Some(
 733                    repo.update(cx, |repo, cx| repo.load_blob_content(oid, cx))
 734                        .await?,
 735                ),
 736            };
 737            let buffer_diff = cx.new(|cx| BufferDiff::new(&buffer_snapshot, cx));
 738
 739            buffer_diff
 740                .update(cx, |buffer_diff, cx| {
 741                    buffer_diff.language_changed(
 742                        buffer_snapshot.language().cloned(),
 743                        language_registry,
 744                        cx,
 745                    );
 746                    buffer_diff.set_base_text(
 747                        content.map(|s| s.as_str().into()),
 748                        buffer_snapshot.language().cloned(),
 749                        buffer_snapshot.text,
 750                        cx,
 751                    )
 752                })
 753                .await?;
 754            let unstaged_diff = this
 755                .update(cx, |this, cx| this.open_unstaged_diff(buffer.clone(), cx))?
 756                .await?;
 757            buffer_diff.update(cx, |buffer_diff, _| {
 758                buffer_diff.set_secondary_diff(unstaged_diff);
 759            });
 760
 761            this.update(cx, |_, cx| {
 762                cx.subscribe(&buffer_diff, Self::on_buffer_diff_event)
 763                    .detach();
 764            })?;
 765
 766            Ok(buffer_diff)
 767        })
 768    }
 769
 770    #[ztracing::instrument(skip_all)]
 771    pub fn open_uncommitted_diff(
 772        &mut self,
 773        buffer: Entity<Buffer>,
 774        cx: &mut Context<Self>,
 775    ) -> Task<Result<Entity<BufferDiff>>> {
 776        let buffer_id = buffer.read(cx).remote_id();
 777
 778        if let Some(diff_state) = self.diffs.get(&buffer_id)
 779            && let Some(uncommitted_diff) = diff_state
 780                .read(cx)
 781                .uncommitted_diff
 782                .as_ref()
 783                .and_then(|weak| weak.upgrade())
 784        {
 785            if let Some(task) =
 786                diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
 787            {
 788                return cx.background_executor().spawn(async move {
 789                    task.await;
 790                    Ok(uncommitted_diff)
 791                });
 792            }
 793            return Task::ready(Ok(uncommitted_diff));
 794        }
 795
 796        let Some((repo, repo_path)) =
 797            self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
 798        else {
 799            return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
 800        };
 801
 802        let task = self
 803            .loading_diffs
 804            .entry((buffer_id, DiffKind::Uncommitted))
 805            .or_insert_with(|| {
 806                let changes = repo.update(cx, |repo, cx| {
 807                    repo.load_committed_text(buffer_id, repo_path, cx)
 808                });
 809
 810                // todo(lw): hot foreground spawn
 811                cx.spawn(async move |this, cx| {
 812                    Self::open_diff_internal(this, DiffKind::Uncommitted, changes.await, buffer, cx)
 813                        .await
 814                        .map_err(Arc::new)
 815                })
 816                .shared()
 817            })
 818            .clone();
 819
 820        cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
 821    }
 822
 823    #[ztracing::instrument(skip_all)]
 824    async fn open_diff_internal(
 825        this: WeakEntity<Self>,
 826        kind: DiffKind,
 827        texts: Result<DiffBasesChange>,
 828        buffer_entity: Entity<Buffer>,
 829        cx: &mut AsyncApp,
 830    ) -> Result<Entity<BufferDiff>> {
 831        let diff_bases_change = match texts {
 832            Err(e) => {
 833                this.update(cx, |this, cx| {
 834                    let buffer = buffer_entity.read(cx);
 835                    let buffer_id = buffer.remote_id();
 836                    this.loading_diffs.remove(&(buffer_id, kind));
 837                })?;
 838                return Err(e);
 839            }
 840            Ok(change) => change,
 841        };
 842
 843        this.update(cx, |this, cx| {
 844            let buffer = buffer_entity.read(cx);
 845            let buffer_id = buffer.remote_id();
 846            let language = buffer.language().cloned();
 847            let language_registry = buffer.language_registry();
 848            let text_snapshot = buffer.text_snapshot();
 849            this.loading_diffs.remove(&(buffer_id, kind));
 850
 851            let git_store = cx.weak_entity();
 852            let diff_state = this
 853                .diffs
 854                .entry(buffer_id)
 855                .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
 856
 857            let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
 858
 859            cx.subscribe(&diff, Self::on_buffer_diff_event).detach();
 860            diff_state.update(cx, |diff_state, cx| {
 861                diff_state.language_changed = true;
 862                diff_state.language = language;
 863                diff_state.language_registry = language_registry;
 864
 865                match kind {
 866                    DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
 867                    DiffKind::Uncommitted => {
 868                        let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
 869                            diff
 870                        } else {
 871                            let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
 872                            diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
 873                            unstaged_diff
 874                        };
 875
 876                        diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
 877                        diff_state.uncommitted_diff = Some(diff.downgrade())
 878                    }
 879                }
 880
 881                diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
 882                let rx = diff_state.wait_for_recalculation();
 883
 884                anyhow::Ok(async move {
 885                    if let Some(rx) = rx {
 886                        rx.await;
 887                    }
 888                    Ok(diff)
 889                })
 890            })
 891        })??
 892        .await
 893    }
 894
 895    pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
 896        let diff_state = self.diffs.get(&buffer_id)?;
 897        diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
 898    }
 899
 900    pub fn get_uncommitted_diff(
 901        &self,
 902        buffer_id: BufferId,
 903        cx: &App,
 904    ) -> Option<Entity<BufferDiff>> {
 905        let diff_state = self.diffs.get(&buffer_id)?;
 906        diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
 907    }
 908
 909    pub fn open_conflict_set(
 910        &mut self,
 911        buffer: Entity<Buffer>,
 912        cx: &mut Context<Self>,
 913    ) -> Entity<ConflictSet> {
 914        log::debug!("open conflict set");
 915        let buffer_id = buffer.read(cx).remote_id();
 916
 917        if let Some(git_state) = self.diffs.get(&buffer_id)
 918            && let Some(conflict_set) = git_state
 919                .read(cx)
 920                .conflict_set
 921                .as_ref()
 922                .and_then(|weak| weak.upgrade())
 923        {
 924            let conflict_set = conflict_set;
 925            let buffer_snapshot = buffer.read(cx).text_snapshot();
 926
 927            git_state.update(cx, |state, cx| {
 928                let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
 929            });
 930
 931            return conflict_set;
 932        }
 933
 934        let is_unmerged = self
 935            .repository_and_path_for_buffer_id(buffer_id, cx)
 936            .is_some_and(|(repo, path)| repo.read(cx).snapshot.has_conflict(&path));
 937        let git_store = cx.weak_entity();
 938        let buffer_git_state = self
 939            .diffs
 940            .entry(buffer_id)
 941            .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
 942        let conflict_set = cx.new(|cx| ConflictSet::new(buffer_id, is_unmerged, cx));
 943
 944        self._subscriptions
 945            .push(cx.subscribe(&conflict_set, |_, _, _, cx| {
 946                cx.emit(GitStoreEvent::ConflictsUpdated);
 947            }));
 948
 949        buffer_git_state.update(cx, |state, cx| {
 950            state.conflict_set = Some(conflict_set.downgrade());
 951            let buffer_snapshot = buffer.read(cx).text_snapshot();
 952            let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
 953        });
 954
 955        conflict_set
 956    }
 957
 958    pub fn project_path_git_status(
 959        &self,
 960        project_path: &ProjectPath,
 961        cx: &App,
 962    ) -> Option<FileStatus> {
 963        let (repo, repo_path) = self.repository_and_path_for_project_path(project_path, cx)?;
 964        Some(repo.read(cx).status_for_path(&repo_path)?.status)
 965    }
 966
 967    pub fn checkpoint(&self, cx: &mut App) -> Task<Result<GitStoreCheckpoint>> {
 968        let mut work_directory_abs_paths = Vec::new();
 969        let mut checkpoints = Vec::new();
 970        for repository in self.repositories.values() {
 971            repository.update(cx, |repository, _| {
 972                work_directory_abs_paths.push(repository.snapshot.work_directory_abs_path.clone());
 973                checkpoints.push(repository.checkpoint().map(|checkpoint| checkpoint?));
 974            });
 975        }
 976
 977        cx.background_executor().spawn(async move {
 978            let checkpoints = future::try_join_all(checkpoints).await?;
 979            Ok(GitStoreCheckpoint {
 980                checkpoints_by_work_dir_abs_path: work_directory_abs_paths
 981                    .into_iter()
 982                    .zip(checkpoints)
 983                    .collect(),
 984            })
 985        })
 986    }
 987
 988    pub fn restore_checkpoint(
 989        &self,
 990        checkpoint: GitStoreCheckpoint,
 991        cx: &mut App,
 992    ) -> Task<Result<()>> {
 993        let repositories_by_work_dir_abs_path = self
 994            .repositories
 995            .values()
 996            .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
 997            .collect::<HashMap<_, _>>();
 998
 999        let mut tasks = Vec::new();
1000        for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
1001            if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path) {
1002                let restore = repository.update(cx, |repository, _| {
1003                    repository.restore_checkpoint(checkpoint)
1004                });
1005                tasks.push(async move { restore.await? });
1006            }
1007        }
1008        cx.background_spawn(async move {
1009            future::try_join_all(tasks).await?;
1010            Ok(())
1011        })
1012    }
1013
1014    /// Compares two checkpoints, returning true if they are equal.
1015    pub fn compare_checkpoints(
1016        &self,
1017        left: GitStoreCheckpoint,
1018        mut right: GitStoreCheckpoint,
1019        cx: &mut App,
1020    ) -> Task<Result<bool>> {
1021        let repositories_by_work_dir_abs_path = self
1022            .repositories
1023            .values()
1024            .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
1025            .collect::<HashMap<_, _>>();
1026
1027        let mut tasks = Vec::new();
1028        for (work_dir_abs_path, left_checkpoint) in left.checkpoints_by_work_dir_abs_path {
1029            if let Some(right_checkpoint) = right
1030                .checkpoints_by_work_dir_abs_path
1031                .remove(&work_dir_abs_path)
1032            {
1033                if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path)
1034                {
1035                    let compare = repository.update(cx, |repository, _| {
1036                        repository.compare_checkpoints(left_checkpoint, right_checkpoint)
1037                    });
1038
1039                    tasks.push(async move { compare.await? });
1040                }
1041            } else {
1042                return Task::ready(Ok(false));
1043            }
1044        }
1045        cx.background_spawn(async move {
1046            Ok(future::try_join_all(tasks)
1047                .await?
1048                .into_iter()
1049                .all(|result| result))
1050        })
1051    }
1052
1053    /// Blames a buffer.
1054    pub fn blame_buffer(
1055        &self,
1056        buffer: &Entity<Buffer>,
1057        version: Option<clock::Global>,
1058        cx: &mut Context<Self>,
1059    ) -> Task<Result<Option<Blame>>> {
1060        let buffer = buffer.read(cx);
1061        let Some((repo, repo_path)) =
1062            self.repository_and_path_for_buffer_id(buffer.remote_id(), cx)
1063        else {
1064            return Task::ready(Err(anyhow!("failed to find a git repository for buffer")));
1065        };
1066        let content = match &version {
1067            Some(version) => buffer.rope_for_version(version),
1068            None => buffer.as_rope().clone(),
1069        };
1070        let line_ending = buffer.line_ending();
1071        let version = version.unwrap_or(buffer.version());
1072        let buffer_id = buffer.remote_id();
1073
1074        let repo = repo.downgrade();
1075        cx.spawn(async move |_, cx| {
1076            let repository_state = repo
1077                .update(cx, |repo, _| repo.repository_state.clone())?
1078                .await
1079                .map_err(|err| anyhow::anyhow!(err))?;
1080            match repository_state {
1081                RepositoryState::Local(LocalRepositoryState { backend, .. }) => backend
1082                    .blame(repo_path.clone(), content, line_ending)
1083                    .await
1084                    .with_context(|| format!("Failed to blame {:?}", repo_path.as_ref()))
1085                    .map(Some),
1086                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
1087                    let response = client
1088                        .request(proto::BlameBuffer {
1089                            project_id: project_id.to_proto(),
1090                            buffer_id: buffer_id.into(),
1091                            version: serialize_version(&version),
1092                        })
1093                        .await?;
1094                    Ok(deserialize_blame_buffer_response(response))
1095                }
1096            }
1097        })
1098    }
1099
1100    pub fn file_history(
1101        &self,
1102        repo: &Entity<Repository>,
1103        path: RepoPath,
1104        cx: &mut App,
1105    ) -> Task<Result<git::repository::FileHistory>> {
1106        let rx = repo.update(cx, |repo, _| repo.file_history(path));
1107
1108        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1109    }
1110
1111    pub fn file_history_paginated(
1112        &self,
1113        repo: &Entity<Repository>,
1114        path: RepoPath,
1115        skip: usize,
1116        limit: Option<usize>,
1117        cx: &mut App,
1118    ) -> Task<Result<git::repository::FileHistory>> {
1119        let rx = repo.update(cx, |repo, _| repo.file_history_paginated(path, skip, limit));
1120
1121        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1122    }
1123
1124    pub fn get_permalink_to_line(
1125        &self,
1126        buffer: &Entity<Buffer>,
1127        selection: Range<u32>,
1128        cx: &mut App,
1129    ) -> Task<Result<url::Url>> {
1130        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1131            return Task::ready(Err(anyhow!("buffer has no file")));
1132        };
1133
1134        let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
1135            &(file.worktree.read(cx).id(), file.path.clone()).into(),
1136            cx,
1137        ) else {
1138            // If we're not in a Git repo, check whether this is a Rust source
1139            // file in the Cargo registry (presumably opened with go-to-definition
1140            // from a normal Rust file). If so, we can put together a permalink
1141            // using crate metadata.
1142            if buffer
1143                .read(cx)
1144                .language()
1145                .is_none_or(|lang| lang.name() != "Rust".into())
1146            {
1147                return Task::ready(Err(anyhow!("no permalink available")));
1148            }
1149            let file_path = file.worktree.read(cx).absolutize(&file.path);
1150            return cx.spawn(async move |cx| {
1151                let provider_registry = cx.update(GitHostingProviderRegistry::default_global);
1152                get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1153                    .context("no permalink available")
1154            });
1155        };
1156
1157        let buffer_id = buffer.read(cx).remote_id();
1158        let branch = repo.read(cx).branch.clone();
1159        let remote = branch
1160            .as_ref()
1161            .and_then(|b| b.upstream.as_ref())
1162            .and_then(|b| b.remote_name())
1163            .unwrap_or("origin")
1164            .to_string();
1165
1166        let rx = repo.update(cx, |repo, _| {
1167            repo.send_job(None, move |state, cx| async move {
1168                match state {
1169                    RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
1170                        let origin_url = backend
1171                            .remote_url(&remote)
1172                            .await
1173                            .with_context(|| format!("remote \"{remote}\" not found"))?;
1174
1175                        let sha = backend.head_sha().await.context("reading HEAD SHA")?;
1176
1177                        let provider_registry =
1178                            cx.update(GitHostingProviderRegistry::default_global);
1179
1180                        let (provider, remote) =
1181                            parse_git_remote_url(provider_registry, &origin_url)
1182                                .context("parsing Git remote URL")?;
1183
1184                        Ok(provider.build_permalink(
1185                            remote,
1186                            BuildPermalinkParams::new(&sha, &repo_path, Some(selection)),
1187                        ))
1188                    }
1189                    RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
1190                        let response = client
1191                            .request(proto::GetPermalinkToLine {
1192                                project_id: project_id.to_proto(),
1193                                buffer_id: buffer_id.into(),
1194                                selection: Some(proto::Range {
1195                                    start: selection.start as u64,
1196                                    end: selection.end as u64,
1197                                }),
1198                            })
1199                            .await?;
1200
1201                        url::Url::parse(&response.permalink).context("failed to parse permalink")
1202                    }
1203                }
1204            })
1205        });
1206        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1207    }
1208
1209    fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> {
1210        match &self.state {
1211            GitStoreState::Local {
1212                downstream: downstream_client,
1213                ..
1214            } => downstream_client
1215                .as_ref()
1216                .map(|state| (state.client.clone(), state.project_id)),
1217            GitStoreState::Remote {
1218                downstream: downstream_client,
1219                ..
1220            } => downstream_client.clone(),
1221        }
1222    }
1223
1224    fn upstream_client(&self) -> Option<AnyProtoClient> {
1225        match &self.state {
1226            GitStoreState::Local { .. } => None,
1227            GitStoreState::Remote {
1228                upstream_client, ..
1229            } => Some(upstream_client.clone()),
1230        }
1231    }
1232
1233    fn on_worktree_store_event(
1234        &mut self,
1235        worktree_store: Entity<WorktreeStore>,
1236        event: &WorktreeStoreEvent,
1237        cx: &mut Context<Self>,
1238    ) {
1239        let GitStoreState::Local {
1240            project_environment,
1241            downstream,
1242            next_repository_id,
1243            fs,
1244        } = &self.state
1245        else {
1246            return;
1247        };
1248
1249        match event {
1250            WorktreeStoreEvent::WorktreeUpdatedEntries(worktree_id, updated_entries) => {
1251                if let Some(worktree) = self
1252                    .worktree_store
1253                    .read(cx)
1254                    .worktree_for_id(*worktree_id, cx)
1255                {
1256                    let paths_by_git_repo =
1257                        self.process_updated_entries(&worktree, updated_entries, cx);
1258                    let downstream = downstream
1259                        .as_ref()
1260                        .map(|downstream| downstream.updates_tx.clone());
1261                    cx.spawn(async move |_, cx| {
1262                        let paths_by_git_repo = paths_by_git_repo.await;
1263                        for (repo, paths) in paths_by_git_repo {
1264                            repo.update(cx, |repo, cx| {
1265                                repo.paths_changed(paths, downstream.clone(), cx);
1266                            });
1267                        }
1268                    })
1269                    .detach();
1270                }
1271            }
1272            WorktreeStoreEvent::WorktreeUpdatedGitRepositories(worktree_id, changed_repos) => {
1273                let Some(worktree) = worktree_store.read(cx).worktree_for_id(*worktree_id, cx)
1274                else {
1275                    return;
1276                };
1277                if !worktree.read(cx).is_visible() {
1278                    log::debug!(
1279                        "not adding repositories for local worktree {:?} because it's not visible",
1280                        worktree.read(cx).abs_path()
1281                    );
1282                    return;
1283                }
1284                self.update_repositories_from_worktree(
1285                    *worktree_id,
1286                    project_environment.clone(),
1287                    next_repository_id.clone(),
1288                    downstream
1289                        .as_ref()
1290                        .map(|downstream| downstream.updates_tx.clone()),
1291                    changed_repos.clone(),
1292                    fs.clone(),
1293                    cx,
1294                );
1295                self.local_worktree_git_repos_changed(worktree, changed_repos, cx);
1296            }
1297            WorktreeStoreEvent::WorktreeRemoved(_entity_id, worktree_id) => {
1298                let repos_without_worktree: Vec<RepositoryId> = self
1299                    .worktree_ids
1300                    .iter_mut()
1301                    .filter_map(|(repo_id, worktree_ids)| {
1302                        worktree_ids.remove(worktree_id);
1303                        if worktree_ids.is_empty() {
1304                            Some(*repo_id)
1305                        } else {
1306                            None
1307                        }
1308                    })
1309                    .collect();
1310                let is_active_repo_removed = repos_without_worktree
1311                    .iter()
1312                    .any(|repo_id| self.active_repo_id == Some(*repo_id));
1313
1314                for repo_id in repos_without_worktree {
1315                    self.repositories.remove(&repo_id);
1316                    self.worktree_ids.remove(&repo_id);
1317                    if let Some(updates_tx) =
1318                        downstream.as_ref().map(|downstream| &downstream.updates_tx)
1319                    {
1320                        updates_tx
1321                            .unbounded_send(DownstreamUpdate::RemoveRepository(repo_id))
1322                            .ok();
1323                    }
1324                }
1325
1326                if is_active_repo_removed {
1327                    if let Some((&repo_id, _)) = self.repositories.iter().next() {
1328                        self.active_repo_id = Some(repo_id);
1329                        cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(repo_id)));
1330                    } else {
1331                        self.active_repo_id = None;
1332                        cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1333                    }
1334                }
1335            }
1336            _ => {}
1337        }
1338    }
1339    fn on_repository_event(
1340        &mut self,
1341        repo: Entity<Repository>,
1342        event: &RepositoryEvent,
1343        cx: &mut Context<Self>,
1344    ) {
1345        let id = repo.read(cx).id;
1346        let repo_snapshot = repo.read(cx).snapshot.clone();
1347        for (buffer_id, diff) in self.diffs.iter() {
1348            if let Some((buffer_repo, repo_path)) =
1349                self.repository_and_path_for_buffer_id(*buffer_id, cx)
1350                && buffer_repo == repo
1351            {
1352                diff.update(cx, |diff, cx| {
1353                    if let Some(conflict_set) = &diff.conflict_set {
1354                        let conflict_status_changed =
1355                            conflict_set.update(cx, |conflict_set, cx| {
1356                                let has_conflict = repo_snapshot.has_conflict(&repo_path);
1357                                conflict_set.set_has_conflict(has_conflict, cx)
1358                            })?;
1359                        if conflict_status_changed {
1360                            let buffer_store = self.buffer_store.read(cx);
1361                            if let Some(buffer) = buffer_store.get(*buffer_id) {
1362                                let _ = diff
1363                                    .reparse_conflict_markers(buffer.read(cx).text_snapshot(), cx);
1364                            }
1365                        }
1366                    }
1367                    anyhow::Ok(())
1368                })
1369                .ok();
1370            }
1371        }
1372        cx.emit(GitStoreEvent::RepositoryUpdated(
1373            id,
1374            event.clone(),
1375            self.active_repo_id == Some(id),
1376        ))
1377    }
1378
1379    fn on_jobs_updated(&mut self, _: Entity<Repository>, _: &JobsUpdated, cx: &mut Context<Self>) {
1380        cx.emit(GitStoreEvent::JobsUpdated)
1381    }
1382
1383    /// Update our list of repositories and schedule git scans in response to a notification from a worktree,
1384    fn update_repositories_from_worktree(
1385        &mut self,
1386        worktree_id: WorktreeId,
1387        project_environment: Entity<ProjectEnvironment>,
1388        next_repository_id: Arc<AtomicU64>,
1389        updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
1390        updated_git_repositories: UpdatedGitRepositoriesSet,
1391        fs: Arc<dyn Fs>,
1392        cx: &mut Context<Self>,
1393    ) {
1394        let mut removed_ids = Vec::new();
1395        for update in updated_git_repositories.iter() {
1396            if let Some((id, existing)) = self.repositories.iter().find(|(_, repo)| {
1397                let existing_work_directory_abs_path =
1398                    repo.read(cx).work_directory_abs_path.clone();
1399                Some(&existing_work_directory_abs_path)
1400                    == update.old_work_directory_abs_path.as_ref()
1401                    || Some(&existing_work_directory_abs_path)
1402                        == update.new_work_directory_abs_path.as_ref()
1403            }) {
1404                let repo_id = *id;
1405                if let Some(new_work_directory_abs_path) =
1406                    update.new_work_directory_abs_path.clone()
1407                {
1408                    self.worktree_ids
1409                        .entry(repo_id)
1410                        .or_insert_with(HashSet::new)
1411                        .insert(worktree_id);
1412                    existing.update(cx, |existing, cx| {
1413                        existing.snapshot.work_directory_abs_path = new_work_directory_abs_path;
1414                        existing.schedule_scan(updates_tx.clone(), cx);
1415                    });
1416                } else {
1417                    if let Some(worktree_ids) = self.worktree_ids.get_mut(&repo_id) {
1418                        worktree_ids.remove(&worktree_id);
1419                        if worktree_ids.is_empty() {
1420                            removed_ids.push(repo_id);
1421                        }
1422                    }
1423                }
1424            } else if let UpdatedGitRepository {
1425                new_work_directory_abs_path: Some(work_directory_abs_path),
1426                dot_git_abs_path: Some(dot_git_abs_path),
1427                repository_dir_abs_path: Some(_repository_dir_abs_path),
1428                common_dir_abs_path: Some(_common_dir_abs_path),
1429                ..
1430            } = update
1431            {
1432                let id = RepositoryId(next_repository_id.fetch_add(1, atomic::Ordering::Release));
1433                let git_store = cx.weak_entity();
1434                let repo = cx.new(|cx| {
1435                    let mut repo = Repository::local(
1436                        id,
1437                        work_directory_abs_path.clone(),
1438                        dot_git_abs_path.clone(),
1439                        project_environment.downgrade(),
1440                        fs.clone(),
1441                        git_store,
1442                        cx,
1443                    );
1444                    if let Some(updates_tx) = updates_tx.as_ref() {
1445                        // trigger an empty `UpdateRepository` to ensure remote active_repo_id is set correctly
1446                        updates_tx
1447                            .unbounded_send(DownstreamUpdate::UpdateRepository(repo.snapshot()))
1448                            .ok();
1449                    }
1450                    repo.schedule_scan(updates_tx.clone(), cx);
1451                    repo
1452                });
1453                self._subscriptions
1454                    .push(cx.subscribe(&repo, Self::on_repository_event));
1455                self._subscriptions
1456                    .push(cx.subscribe(&repo, Self::on_jobs_updated));
1457                self.repositories.insert(id, repo);
1458                self.worktree_ids.insert(id, HashSet::from([worktree_id]));
1459                cx.emit(GitStoreEvent::RepositoryAdded);
1460                self.active_repo_id.get_or_insert_with(|| {
1461                    cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1462                    id
1463                });
1464            }
1465        }
1466
1467        for id in removed_ids {
1468            if self.active_repo_id == Some(id) {
1469                self.active_repo_id = None;
1470                cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1471            }
1472            self.repositories.remove(&id);
1473            if let Some(updates_tx) = updates_tx.as_ref() {
1474                updates_tx
1475                    .unbounded_send(DownstreamUpdate::RemoveRepository(id))
1476                    .ok();
1477            }
1478        }
1479    }
1480
1481    fn on_buffer_store_event(
1482        &mut self,
1483        _: Entity<BufferStore>,
1484        event: &BufferStoreEvent,
1485        cx: &mut Context<Self>,
1486    ) {
1487        match event {
1488            BufferStoreEvent::BufferAdded(buffer) => {
1489                cx.subscribe(buffer, |this, buffer, event, cx| {
1490                    if let BufferEvent::LanguageChanged(_) = event {
1491                        let buffer_id = buffer.read(cx).remote_id();
1492                        if let Some(diff_state) = this.diffs.get(&buffer_id) {
1493                            diff_state.update(cx, |diff_state, cx| {
1494                                diff_state.buffer_language_changed(buffer, cx);
1495                            });
1496                        }
1497                    }
1498                })
1499                .detach();
1500            }
1501            BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => {
1502                if let Some(diffs) = self.shared_diffs.get_mut(peer_id) {
1503                    diffs.remove(buffer_id);
1504                }
1505            }
1506            BufferStoreEvent::BufferDropped(buffer_id) => {
1507                self.diffs.remove(buffer_id);
1508                for diffs in self.shared_diffs.values_mut() {
1509                    diffs.remove(buffer_id);
1510                }
1511            }
1512            BufferStoreEvent::BufferChangedFilePath { buffer, .. } => {
1513                // Whenever a buffer's file path changes, it's possible that the
1514                // new path is actually a path that is being tracked by a git
1515                // repository. In that case, we'll want to update the buffer's
1516                // `BufferDiffState`, in case it already has one.
1517                let buffer_id = buffer.read(cx).remote_id();
1518                let diff_state = self.diffs.get(&buffer_id);
1519                let repo = self.repository_and_path_for_buffer_id(buffer_id, cx);
1520
1521                if let Some(diff_state) = diff_state
1522                    && let Some((repo, repo_path)) = repo
1523                {
1524                    let buffer = buffer.clone();
1525                    let diff_state = diff_state.clone();
1526
1527                    cx.spawn(async move |_git_store, cx| {
1528                        async {
1529                            let diff_bases_change = repo
1530                                .update(cx, |repo, cx| {
1531                                    repo.load_committed_text(buffer_id, repo_path, cx)
1532                                })
1533                                .await?;
1534
1535                            diff_state.update(cx, |diff_state, cx| {
1536                                let buffer_snapshot = buffer.read(cx).text_snapshot();
1537                                diff_state.diff_bases_changed(
1538                                    buffer_snapshot,
1539                                    Some(diff_bases_change),
1540                                    cx,
1541                                );
1542                            });
1543                            anyhow::Ok(())
1544                        }
1545                        .await
1546                        .log_err();
1547                    })
1548                    .detach();
1549                }
1550            }
1551            _ => {}
1552        }
1553    }
1554
1555    pub fn recalculate_buffer_diffs(
1556        &mut self,
1557        buffers: Vec<Entity<Buffer>>,
1558        cx: &mut Context<Self>,
1559    ) -> impl Future<Output = ()> + use<> {
1560        let mut futures = Vec::new();
1561        for buffer in buffers {
1562            if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
1563                let buffer = buffer.read(cx).text_snapshot();
1564                diff_state.update(cx, |diff_state, cx| {
1565                    diff_state.recalculate_diffs(buffer.clone(), cx);
1566                    futures.extend(diff_state.wait_for_recalculation().map(FutureExt::boxed));
1567                });
1568                futures.push(diff_state.update(cx, |diff_state, cx| {
1569                    diff_state
1570                        .reparse_conflict_markers(buffer, cx)
1571                        .map(|_| {})
1572                        .boxed()
1573                }));
1574            }
1575        }
1576        async move {
1577            futures::future::join_all(futures).await;
1578        }
1579    }
1580
1581    fn on_buffer_diff_event(
1582        &mut self,
1583        diff: Entity<buffer_diff::BufferDiff>,
1584        event: &BufferDiffEvent,
1585        cx: &mut Context<Self>,
1586    ) {
1587        if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
1588            let buffer_id = diff.read(cx).buffer_id;
1589            if let Some(diff_state) = self.diffs.get(&buffer_id) {
1590                let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
1591                    diff_state.hunk_staging_operation_count += 1;
1592                    diff_state.hunk_staging_operation_count
1593                });
1594                if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
1595                    let recv = repo.update(cx, |repo, cx| {
1596                        log::debug!("hunks changed for {}", path.as_unix_str());
1597                        repo.spawn_set_index_text_job(
1598                            path,
1599                            new_index_text.as_ref().map(|rope| rope.to_string()),
1600                            Some(hunk_staging_operation_count),
1601                            cx,
1602                        )
1603                    });
1604                    let diff = diff.downgrade();
1605                    cx.spawn(async move |this, cx| {
1606                        if let Ok(Err(error)) = cx.background_spawn(recv).await {
1607                            diff.update(cx, |diff, cx| {
1608                                diff.clear_pending_hunks(cx);
1609                            })
1610                            .ok();
1611                            this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
1612                                .ok();
1613                        }
1614                    })
1615                    .detach();
1616                }
1617            }
1618        }
1619    }
1620
1621    fn local_worktree_git_repos_changed(
1622        &mut self,
1623        worktree: Entity<Worktree>,
1624        changed_repos: &UpdatedGitRepositoriesSet,
1625        cx: &mut Context<Self>,
1626    ) {
1627        log::debug!("local worktree repos changed");
1628        debug_assert!(worktree.read(cx).is_local());
1629
1630        for repository in self.repositories.values() {
1631            repository.update(cx, |repository, cx| {
1632                let repo_abs_path = &repository.work_directory_abs_path;
1633                if changed_repos.iter().any(|update| {
1634                    update.old_work_directory_abs_path.as_ref() == Some(repo_abs_path)
1635                        || update.new_work_directory_abs_path.as_ref() == Some(repo_abs_path)
1636                }) {
1637                    repository.reload_buffer_diff_bases(cx);
1638                }
1639            });
1640        }
1641    }
1642
1643    pub fn repositories(&self) -> &HashMap<RepositoryId, Entity<Repository>> {
1644        &self.repositories
1645    }
1646
1647    pub fn status_for_buffer_id(&self, buffer_id: BufferId, cx: &App) -> Option<FileStatus> {
1648        let (repo, path) = self.repository_and_path_for_buffer_id(buffer_id, cx)?;
1649        let status = repo.read(cx).snapshot.status_for_path(&path)?;
1650        Some(status.status)
1651    }
1652
1653    pub fn repository_and_path_for_buffer_id(
1654        &self,
1655        buffer_id: BufferId,
1656        cx: &App,
1657    ) -> Option<(Entity<Repository>, RepoPath)> {
1658        let buffer = self.buffer_store.read(cx).get(buffer_id)?;
1659        let project_path = buffer.read(cx).project_path(cx)?;
1660        self.repository_and_path_for_project_path(&project_path, cx)
1661    }
1662
1663    pub fn repository_and_path_for_project_path(
1664        &self,
1665        path: &ProjectPath,
1666        cx: &App,
1667    ) -> Option<(Entity<Repository>, RepoPath)> {
1668        let abs_path = self.worktree_store.read(cx).absolutize(path, cx)?;
1669        self.repositories
1670            .values()
1671            .filter_map(|repo| {
1672                let repo_path = repo.read(cx).abs_path_to_repo_path(&abs_path)?;
1673                Some((repo.clone(), repo_path))
1674            })
1675            .max_by_key(|(repo, _)| repo.read(cx).work_directory_abs_path.clone())
1676    }
1677
1678    pub fn git_init(
1679        &self,
1680        path: Arc<Path>,
1681        fallback_branch_name: String,
1682        cx: &App,
1683    ) -> Task<Result<()>> {
1684        match &self.state {
1685            GitStoreState::Local { fs, .. } => {
1686                let fs = fs.clone();
1687                cx.background_executor()
1688                    .spawn(async move { fs.git_init(&path, fallback_branch_name).await })
1689            }
1690            GitStoreState::Remote {
1691                upstream_client,
1692                upstream_project_id: project_id,
1693                ..
1694            } => {
1695                let client = upstream_client.clone();
1696                let project_id = *project_id;
1697                cx.background_executor().spawn(async move {
1698                    client
1699                        .request(proto::GitInit {
1700                            project_id: project_id,
1701                            abs_path: path.to_string_lossy().into_owned(),
1702                            fallback_branch_name,
1703                        })
1704                        .await?;
1705                    Ok(())
1706                })
1707            }
1708        }
1709    }
1710
1711    pub fn git_clone(
1712        &self,
1713        repo: String,
1714        path: impl Into<Arc<std::path::Path>>,
1715        cx: &App,
1716    ) -> Task<Result<()>> {
1717        let path = path.into();
1718        match &self.state {
1719            GitStoreState::Local { fs, .. } => {
1720                let fs = fs.clone();
1721                cx.background_executor()
1722                    .spawn(async move { fs.git_clone(&repo, &path).await })
1723            }
1724            GitStoreState::Remote {
1725                upstream_client,
1726                upstream_project_id,
1727                ..
1728            } => {
1729                if upstream_client.is_via_collab() {
1730                    return Task::ready(Err(anyhow!(
1731                        "Git Clone isn't supported for project guests"
1732                    )));
1733                }
1734                let request = upstream_client.request(proto::GitClone {
1735                    project_id: *upstream_project_id,
1736                    abs_path: path.to_string_lossy().into_owned(),
1737                    remote_repo: repo,
1738                });
1739
1740                cx.background_spawn(async move {
1741                    let result = request.await?;
1742
1743                    match result.success {
1744                        true => Ok(()),
1745                        false => Err(anyhow!("Git Clone failed")),
1746                    }
1747                })
1748            }
1749        }
1750    }
1751
1752    async fn handle_update_repository(
1753        this: Entity<Self>,
1754        envelope: TypedEnvelope<proto::UpdateRepository>,
1755        mut cx: AsyncApp,
1756    ) -> Result<()> {
1757        this.update(&mut cx, |this, cx| {
1758            let path_style = this.worktree_store.read(cx).path_style();
1759            let mut update = envelope.payload;
1760
1761            let id = RepositoryId::from_proto(update.id);
1762            let client = this.upstream_client().context("no upstream client")?;
1763
1764            let mut repo_subscription = None;
1765            let repo = this.repositories.entry(id).or_insert_with(|| {
1766                let git_store = cx.weak_entity();
1767                let repo = cx.new(|cx| {
1768                    Repository::remote(
1769                        id,
1770                        Path::new(&update.abs_path).into(),
1771                        path_style,
1772                        ProjectId(update.project_id),
1773                        client,
1774                        git_store,
1775                        cx,
1776                    )
1777                });
1778                repo_subscription = Some(cx.subscribe(&repo, Self::on_repository_event));
1779                cx.emit(GitStoreEvent::RepositoryAdded);
1780                repo
1781            });
1782            this._subscriptions.extend(repo_subscription);
1783
1784            repo.update(cx, {
1785                let update = update.clone();
1786                |repo, cx| repo.apply_remote_update(update, cx)
1787            })?;
1788
1789            this.active_repo_id.get_or_insert_with(|| {
1790                cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1791                id
1792            });
1793
1794            if let Some((client, project_id)) = this.downstream_client() {
1795                update.project_id = project_id.to_proto();
1796                client.send(update).log_err();
1797            }
1798            Ok(())
1799        })
1800    }
1801
1802    async fn handle_remove_repository(
1803        this: Entity<Self>,
1804        envelope: TypedEnvelope<proto::RemoveRepository>,
1805        mut cx: AsyncApp,
1806    ) -> Result<()> {
1807        this.update(&mut cx, |this, cx| {
1808            let mut update = envelope.payload;
1809            let id = RepositoryId::from_proto(update.id);
1810            this.repositories.remove(&id);
1811            if let Some((client, project_id)) = this.downstream_client() {
1812                update.project_id = project_id.to_proto();
1813                client.send(update).log_err();
1814            }
1815            if this.active_repo_id == Some(id) {
1816                this.active_repo_id = None;
1817                cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1818            }
1819            cx.emit(GitStoreEvent::RepositoryRemoved(id));
1820        });
1821        Ok(())
1822    }
1823
1824    async fn handle_git_init(
1825        this: Entity<Self>,
1826        envelope: TypedEnvelope<proto::GitInit>,
1827        cx: AsyncApp,
1828    ) -> Result<proto::Ack> {
1829        let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1830        let name = envelope.payload.fallback_branch_name;
1831        cx.update(|cx| this.read(cx).git_init(path, name, cx))
1832            .await?;
1833
1834        Ok(proto::Ack {})
1835    }
1836
1837    async fn handle_git_clone(
1838        this: Entity<Self>,
1839        envelope: TypedEnvelope<proto::GitClone>,
1840        cx: AsyncApp,
1841    ) -> Result<proto::GitCloneResponse> {
1842        let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1843        let repo_name = envelope.payload.remote_repo;
1844        let result = cx
1845            .update(|cx| this.read(cx).git_clone(repo_name, path, cx))
1846            .await;
1847
1848        Ok(proto::GitCloneResponse {
1849            success: result.is_ok(),
1850        })
1851    }
1852
1853    async fn handle_fetch(
1854        this: Entity<Self>,
1855        envelope: TypedEnvelope<proto::Fetch>,
1856        mut cx: AsyncApp,
1857    ) -> Result<proto::RemoteMessageResponse> {
1858        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1859        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1860        let fetch_options = FetchOptions::from_proto(envelope.payload.remote);
1861        let askpass_id = envelope.payload.askpass_id;
1862
1863        let askpass = make_remote_delegate(
1864            this,
1865            envelope.payload.project_id,
1866            repository_id,
1867            askpass_id,
1868            &mut cx,
1869        );
1870
1871        let remote_output = repository_handle
1872            .update(&mut cx, |repository_handle, cx| {
1873                repository_handle.fetch(fetch_options, askpass, cx)
1874            })
1875            .await??;
1876
1877        Ok(proto::RemoteMessageResponse {
1878            stdout: remote_output.stdout,
1879            stderr: remote_output.stderr,
1880        })
1881    }
1882
1883    async fn handle_push(
1884        this: Entity<Self>,
1885        envelope: TypedEnvelope<proto::Push>,
1886        mut cx: AsyncApp,
1887    ) -> Result<proto::RemoteMessageResponse> {
1888        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1889        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1890
1891        let askpass_id = envelope.payload.askpass_id;
1892        let askpass = make_remote_delegate(
1893            this,
1894            envelope.payload.project_id,
1895            repository_id,
1896            askpass_id,
1897            &mut cx,
1898        );
1899
1900        let options = envelope
1901            .payload
1902            .options
1903            .as_ref()
1904            .map(|_| match envelope.payload.options() {
1905                proto::push::PushOptions::SetUpstream => git::repository::PushOptions::SetUpstream,
1906                proto::push::PushOptions::Force => git::repository::PushOptions::Force,
1907            });
1908
1909        let branch_name = envelope.payload.branch_name.into();
1910        let remote_branch_name = envelope.payload.remote_branch_name.into();
1911        let remote_name = envelope.payload.remote_name.into();
1912
1913        let remote_output = repository_handle
1914            .update(&mut cx, |repository_handle, cx| {
1915                repository_handle.push(
1916                    branch_name,
1917                    remote_branch_name,
1918                    remote_name,
1919                    options,
1920                    askpass,
1921                    cx,
1922                )
1923            })
1924            .await??;
1925        Ok(proto::RemoteMessageResponse {
1926            stdout: remote_output.stdout,
1927            stderr: remote_output.stderr,
1928        })
1929    }
1930
1931    async fn handle_pull(
1932        this: Entity<Self>,
1933        envelope: TypedEnvelope<proto::Pull>,
1934        mut cx: AsyncApp,
1935    ) -> Result<proto::RemoteMessageResponse> {
1936        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1937        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1938        let askpass_id = envelope.payload.askpass_id;
1939        let askpass = make_remote_delegate(
1940            this,
1941            envelope.payload.project_id,
1942            repository_id,
1943            askpass_id,
1944            &mut cx,
1945        );
1946
1947        let branch_name = envelope.payload.branch_name.map(|name| name.into());
1948        let remote_name = envelope.payload.remote_name.into();
1949        let rebase = envelope.payload.rebase;
1950
1951        let remote_message = repository_handle
1952            .update(&mut cx, |repository_handle, cx| {
1953                repository_handle.pull(branch_name, remote_name, rebase, askpass, cx)
1954            })
1955            .await??;
1956
1957        Ok(proto::RemoteMessageResponse {
1958            stdout: remote_message.stdout,
1959            stderr: remote_message.stderr,
1960        })
1961    }
1962
1963    async fn handle_stage(
1964        this: Entity<Self>,
1965        envelope: TypedEnvelope<proto::Stage>,
1966        mut cx: AsyncApp,
1967    ) -> Result<proto::Ack> {
1968        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1969        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1970
1971        let entries = envelope
1972            .payload
1973            .paths
1974            .into_iter()
1975            .map(|path| RepoPath::new(&path))
1976            .collect::<Result<Vec<_>>>()?;
1977
1978        repository_handle
1979            .update(&mut cx, |repository_handle, cx| {
1980                repository_handle.stage_entries(entries, cx)
1981            })
1982            .await?;
1983        Ok(proto::Ack {})
1984    }
1985
1986    async fn handle_unstage(
1987        this: Entity<Self>,
1988        envelope: TypedEnvelope<proto::Unstage>,
1989        mut cx: AsyncApp,
1990    ) -> Result<proto::Ack> {
1991        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1992        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1993
1994        let entries = envelope
1995            .payload
1996            .paths
1997            .into_iter()
1998            .map(|path| RepoPath::new(&path))
1999            .collect::<Result<Vec<_>>>()?;
2000
2001        repository_handle
2002            .update(&mut cx, |repository_handle, cx| {
2003                repository_handle.unstage_entries(entries, cx)
2004            })
2005            .await?;
2006
2007        Ok(proto::Ack {})
2008    }
2009
2010    async fn handle_stash(
2011        this: Entity<Self>,
2012        envelope: TypedEnvelope<proto::Stash>,
2013        mut cx: AsyncApp,
2014    ) -> Result<proto::Ack> {
2015        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2016        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2017
2018        let entries = envelope
2019            .payload
2020            .paths
2021            .into_iter()
2022            .map(|path| RepoPath::new(&path))
2023            .collect::<Result<Vec<_>>>()?;
2024
2025        repository_handle
2026            .update(&mut cx, |repository_handle, cx| {
2027                repository_handle.stash_entries(entries, cx)
2028            })
2029            .await?;
2030
2031        Ok(proto::Ack {})
2032    }
2033
2034    async fn handle_stash_pop(
2035        this: Entity<Self>,
2036        envelope: TypedEnvelope<proto::StashPop>,
2037        mut cx: AsyncApp,
2038    ) -> Result<proto::Ack> {
2039        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2040        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2041        let stash_index = envelope.payload.stash_index.map(|i| i as usize);
2042
2043        repository_handle
2044            .update(&mut cx, |repository_handle, cx| {
2045                repository_handle.stash_pop(stash_index, cx)
2046            })
2047            .await?;
2048
2049        Ok(proto::Ack {})
2050    }
2051
2052    async fn handle_stash_apply(
2053        this: Entity<Self>,
2054        envelope: TypedEnvelope<proto::StashApply>,
2055        mut cx: AsyncApp,
2056    ) -> Result<proto::Ack> {
2057        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2058        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2059        let stash_index = envelope.payload.stash_index.map(|i| i as usize);
2060
2061        repository_handle
2062            .update(&mut cx, |repository_handle, cx| {
2063                repository_handle.stash_apply(stash_index, cx)
2064            })
2065            .await?;
2066
2067        Ok(proto::Ack {})
2068    }
2069
2070    async fn handle_stash_drop(
2071        this: Entity<Self>,
2072        envelope: TypedEnvelope<proto::StashDrop>,
2073        mut cx: AsyncApp,
2074    ) -> Result<proto::Ack> {
2075        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2076        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2077        let stash_index = envelope.payload.stash_index.map(|i| i as usize);
2078
2079        repository_handle
2080            .update(&mut cx, |repository_handle, cx| {
2081                repository_handle.stash_drop(stash_index, cx)
2082            })
2083            .await??;
2084
2085        Ok(proto::Ack {})
2086    }
2087
2088    async fn handle_set_index_text(
2089        this: Entity<Self>,
2090        envelope: TypedEnvelope<proto::SetIndexText>,
2091        mut cx: AsyncApp,
2092    ) -> Result<proto::Ack> {
2093        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2094        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2095        let repo_path = RepoPath::from_proto(&envelope.payload.path)?;
2096
2097        repository_handle
2098            .update(&mut cx, |repository_handle, cx| {
2099                repository_handle.spawn_set_index_text_job(
2100                    repo_path,
2101                    envelope.payload.text,
2102                    None,
2103                    cx,
2104                )
2105            })
2106            .await??;
2107        Ok(proto::Ack {})
2108    }
2109
2110    async fn handle_run_hook(
2111        this: Entity<Self>,
2112        envelope: TypedEnvelope<proto::RunGitHook>,
2113        mut cx: AsyncApp,
2114    ) -> Result<proto::Ack> {
2115        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2116        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2117        let hook = RunHook::from_proto(envelope.payload.hook).context("invalid hook")?;
2118        repository_handle
2119            .update(&mut cx, |repository_handle, cx| {
2120                repository_handle.run_hook(hook, cx)
2121            })
2122            .await??;
2123        Ok(proto::Ack {})
2124    }
2125
2126    async fn handle_commit(
2127        this: Entity<Self>,
2128        envelope: TypedEnvelope<proto::Commit>,
2129        mut cx: AsyncApp,
2130    ) -> Result<proto::Ack> {
2131        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2132        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2133        let askpass_id = envelope.payload.askpass_id;
2134
2135        let askpass = make_remote_delegate(
2136            this,
2137            envelope.payload.project_id,
2138            repository_id,
2139            askpass_id,
2140            &mut cx,
2141        );
2142
2143        let message = SharedString::from(envelope.payload.message);
2144        let name = envelope.payload.name.map(SharedString::from);
2145        let email = envelope.payload.email.map(SharedString::from);
2146        let options = envelope.payload.options.unwrap_or_default();
2147
2148        repository_handle
2149            .update(&mut cx, |repository_handle, cx| {
2150                repository_handle.commit(
2151                    message,
2152                    name.zip(email),
2153                    CommitOptions {
2154                        amend: options.amend,
2155                        signoff: options.signoff,
2156                    },
2157                    askpass,
2158                    cx,
2159                )
2160            })
2161            .await??;
2162        Ok(proto::Ack {})
2163    }
2164
2165    async fn handle_get_remotes(
2166        this: Entity<Self>,
2167        envelope: TypedEnvelope<proto::GetRemotes>,
2168        mut cx: AsyncApp,
2169    ) -> Result<proto::GetRemotesResponse> {
2170        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2171        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2172
2173        let branch_name = envelope.payload.branch_name;
2174        let is_push = envelope.payload.is_push;
2175
2176        let remotes = repository_handle
2177            .update(&mut cx, |repository_handle, _| {
2178                repository_handle.get_remotes(branch_name, is_push)
2179            })
2180            .await??;
2181
2182        Ok(proto::GetRemotesResponse {
2183            remotes: remotes
2184                .into_iter()
2185                .map(|remotes| proto::get_remotes_response::Remote {
2186                    name: remotes.name.to_string(),
2187                })
2188                .collect::<Vec<_>>(),
2189        })
2190    }
2191
2192    async fn handle_get_worktrees(
2193        this: Entity<Self>,
2194        envelope: TypedEnvelope<proto::GitGetWorktrees>,
2195        mut cx: AsyncApp,
2196    ) -> Result<proto::GitWorktreesResponse> {
2197        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2198        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2199
2200        let worktrees = repository_handle
2201            .update(&mut cx, |repository_handle, _| {
2202                repository_handle.worktrees()
2203            })
2204            .await??;
2205
2206        Ok(proto::GitWorktreesResponse {
2207            worktrees: worktrees
2208                .into_iter()
2209                .map(|worktree| worktree_to_proto(&worktree))
2210                .collect::<Vec<_>>(),
2211        })
2212    }
2213
2214    async fn handle_create_worktree(
2215        this: Entity<Self>,
2216        envelope: TypedEnvelope<proto::GitCreateWorktree>,
2217        mut cx: AsyncApp,
2218    ) -> Result<proto::Ack> {
2219        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2220        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2221        let directory = PathBuf::from(envelope.payload.directory);
2222        let name = envelope.payload.name;
2223        let commit = envelope.payload.commit;
2224
2225        repository_handle
2226            .update(&mut cx, |repository_handle, _| {
2227                repository_handle.create_worktree(name, directory, commit)
2228            })
2229            .await??;
2230
2231        Ok(proto::Ack {})
2232    }
2233
2234    async fn handle_get_branches(
2235        this: Entity<Self>,
2236        envelope: TypedEnvelope<proto::GitGetBranches>,
2237        mut cx: AsyncApp,
2238    ) -> Result<proto::GitBranchesResponse> {
2239        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2240        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2241
2242        let branches = repository_handle
2243            .update(&mut cx, |repository_handle, _| repository_handle.branches())
2244            .await??;
2245
2246        Ok(proto::GitBranchesResponse {
2247            branches: branches
2248                .into_iter()
2249                .map(|branch| branch_to_proto(&branch))
2250                .collect::<Vec<_>>(),
2251        })
2252    }
2253    async fn handle_get_default_branch(
2254        this: Entity<Self>,
2255        envelope: TypedEnvelope<proto::GetDefaultBranch>,
2256        mut cx: AsyncApp,
2257    ) -> Result<proto::GetDefaultBranchResponse> {
2258        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2259        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2260
2261        let branch = repository_handle
2262            .update(&mut cx, |repository_handle, _| {
2263                repository_handle.default_branch(false)
2264            })
2265            .await??
2266            .map(Into::into);
2267
2268        Ok(proto::GetDefaultBranchResponse { branch })
2269    }
2270    async fn handle_create_branch(
2271        this: Entity<Self>,
2272        envelope: TypedEnvelope<proto::GitCreateBranch>,
2273        mut cx: AsyncApp,
2274    ) -> Result<proto::Ack> {
2275        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2276        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2277        let branch_name = envelope.payload.branch_name;
2278
2279        repository_handle
2280            .update(&mut cx, |repository_handle, _| {
2281                repository_handle.create_branch(branch_name, None)
2282            })
2283            .await??;
2284
2285        Ok(proto::Ack {})
2286    }
2287
2288    async fn handle_change_branch(
2289        this: Entity<Self>,
2290        envelope: TypedEnvelope<proto::GitChangeBranch>,
2291        mut cx: AsyncApp,
2292    ) -> Result<proto::Ack> {
2293        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2294        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2295        let branch_name = envelope.payload.branch_name;
2296
2297        repository_handle
2298            .update(&mut cx, |repository_handle, _| {
2299                repository_handle.change_branch(branch_name)
2300            })
2301            .await??;
2302
2303        Ok(proto::Ack {})
2304    }
2305
2306    async fn handle_rename_branch(
2307        this: Entity<Self>,
2308        envelope: TypedEnvelope<proto::GitRenameBranch>,
2309        mut cx: AsyncApp,
2310    ) -> Result<proto::Ack> {
2311        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2312        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2313        let branch = envelope.payload.branch;
2314        let new_name = envelope.payload.new_name;
2315
2316        repository_handle
2317            .update(&mut cx, |repository_handle, _| {
2318                repository_handle.rename_branch(branch, new_name)
2319            })
2320            .await??;
2321
2322        Ok(proto::Ack {})
2323    }
2324
2325    async fn handle_create_remote(
2326        this: Entity<Self>,
2327        envelope: TypedEnvelope<proto::GitCreateRemote>,
2328        mut cx: AsyncApp,
2329    ) -> Result<proto::Ack> {
2330        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2331        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2332        let remote_name = envelope.payload.remote_name;
2333        let remote_url = envelope.payload.remote_url;
2334
2335        repository_handle
2336            .update(&mut cx, |repository_handle, _| {
2337                repository_handle.create_remote(remote_name, remote_url)
2338            })
2339            .await??;
2340
2341        Ok(proto::Ack {})
2342    }
2343
2344    async fn handle_delete_branch(
2345        this: Entity<Self>,
2346        envelope: TypedEnvelope<proto::GitDeleteBranch>,
2347        mut cx: AsyncApp,
2348    ) -> Result<proto::Ack> {
2349        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2350        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2351        let branch_name = envelope.payload.branch_name;
2352
2353        repository_handle
2354            .update(&mut cx, |repository_handle, _| {
2355                repository_handle.delete_branch(branch_name)
2356            })
2357            .await??;
2358
2359        Ok(proto::Ack {})
2360    }
2361
2362    async fn handle_remove_remote(
2363        this: Entity<Self>,
2364        envelope: TypedEnvelope<proto::GitRemoveRemote>,
2365        mut cx: AsyncApp,
2366    ) -> Result<proto::Ack> {
2367        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2368        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2369        let remote_name = envelope.payload.remote_name;
2370
2371        repository_handle
2372            .update(&mut cx, |repository_handle, _| {
2373                repository_handle.remove_remote(remote_name)
2374            })
2375            .await??;
2376
2377        Ok(proto::Ack {})
2378    }
2379
2380    async fn handle_show(
2381        this: Entity<Self>,
2382        envelope: TypedEnvelope<proto::GitShow>,
2383        mut cx: AsyncApp,
2384    ) -> Result<proto::GitCommitDetails> {
2385        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2386        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2387
2388        let commit = repository_handle
2389            .update(&mut cx, |repository_handle, _| {
2390                repository_handle.show(envelope.payload.commit)
2391            })
2392            .await??;
2393        Ok(proto::GitCommitDetails {
2394            sha: commit.sha.into(),
2395            message: commit.message.into(),
2396            commit_timestamp: commit.commit_timestamp,
2397            author_email: commit.author_email.into(),
2398            author_name: commit.author_name.into(),
2399        })
2400    }
2401
2402    async fn handle_load_commit_diff(
2403        this: Entity<Self>,
2404        envelope: TypedEnvelope<proto::LoadCommitDiff>,
2405        mut cx: AsyncApp,
2406    ) -> Result<proto::LoadCommitDiffResponse> {
2407        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2408        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2409
2410        let commit_diff = repository_handle
2411            .update(&mut cx, |repository_handle, _| {
2412                repository_handle.load_commit_diff(envelope.payload.commit)
2413            })
2414            .await??;
2415        Ok(proto::LoadCommitDiffResponse {
2416            files: commit_diff
2417                .files
2418                .into_iter()
2419                .map(|file| proto::CommitFile {
2420                    path: file.path.to_proto(),
2421                    old_text: file.old_text,
2422                    new_text: file.new_text,
2423                    is_binary: file.is_binary,
2424                })
2425                .collect(),
2426        })
2427    }
2428
2429    async fn handle_file_history(
2430        this: Entity<Self>,
2431        envelope: TypedEnvelope<proto::GitFileHistory>,
2432        mut cx: AsyncApp,
2433    ) -> Result<proto::GitFileHistoryResponse> {
2434        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2435        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2436        let path = RepoPath::from_proto(&envelope.payload.path)?;
2437        let skip = envelope.payload.skip as usize;
2438        let limit = envelope.payload.limit.map(|l| l as usize);
2439
2440        let file_history = repository_handle
2441            .update(&mut cx, |repository_handle, _| {
2442                repository_handle.file_history_paginated(path, skip, limit)
2443            })
2444            .await??;
2445
2446        Ok(proto::GitFileHistoryResponse {
2447            entries: file_history
2448                .entries
2449                .into_iter()
2450                .map(|entry| proto::FileHistoryEntry {
2451                    sha: entry.sha.to_string(),
2452                    subject: entry.subject.to_string(),
2453                    message: entry.message.to_string(),
2454                    commit_timestamp: entry.commit_timestamp,
2455                    author_name: entry.author_name.to_string(),
2456                    author_email: entry.author_email.to_string(),
2457                })
2458                .collect(),
2459            path: file_history.path.to_proto(),
2460        })
2461    }
2462
2463    async fn handle_reset(
2464        this: Entity<Self>,
2465        envelope: TypedEnvelope<proto::GitReset>,
2466        mut cx: AsyncApp,
2467    ) -> Result<proto::Ack> {
2468        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2469        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2470
2471        let mode = match envelope.payload.mode() {
2472            git_reset::ResetMode::Soft => ResetMode::Soft,
2473            git_reset::ResetMode::Mixed => ResetMode::Mixed,
2474        };
2475
2476        repository_handle
2477            .update(&mut cx, |repository_handle, cx| {
2478                repository_handle.reset(envelope.payload.commit, mode, cx)
2479            })
2480            .await??;
2481        Ok(proto::Ack {})
2482    }
2483
2484    async fn handle_checkout_files(
2485        this: Entity<Self>,
2486        envelope: TypedEnvelope<proto::GitCheckoutFiles>,
2487        mut cx: AsyncApp,
2488    ) -> Result<proto::Ack> {
2489        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2490        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2491        let paths = envelope
2492            .payload
2493            .paths
2494            .iter()
2495            .map(|s| RepoPath::from_proto(s))
2496            .collect::<Result<Vec<_>>>()?;
2497
2498        repository_handle
2499            .update(&mut cx, |repository_handle, cx| {
2500                repository_handle.checkout_files(&envelope.payload.commit, paths, cx)
2501            })
2502            .await?;
2503        Ok(proto::Ack {})
2504    }
2505
2506    async fn handle_open_commit_message_buffer(
2507        this: Entity<Self>,
2508        envelope: TypedEnvelope<proto::OpenCommitMessageBuffer>,
2509        mut cx: AsyncApp,
2510    ) -> Result<proto::OpenBufferResponse> {
2511        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2512        let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
2513        let buffer = repository
2514            .update(&mut cx, |repository, cx| {
2515                repository.open_commit_buffer(None, this.read(cx).buffer_store.clone(), cx)
2516            })
2517            .await?;
2518
2519        let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id());
2520        this.update(&mut cx, |this, cx| {
2521            this.buffer_store.update(cx, |buffer_store, cx| {
2522                buffer_store
2523                    .create_buffer_for_peer(
2524                        &buffer,
2525                        envelope.original_sender_id.unwrap_or(envelope.sender_id),
2526                        cx,
2527                    )
2528                    .detach_and_log_err(cx);
2529            })
2530        });
2531
2532        Ok(proto::OpenBufferResponse {
2533            buffer_id: buffer_id.to_proto(),
2534        })
2535    }
2536
2537    async fn handle_askpass(
2538        this: Entity<Self>,
2539        envelope: TypedEnvelope<proto::AskPassRequest>,
2540        mut cx: AsyncApp,
2541    ) -> Result<proto::AskPassResponse> {
2542        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2543        let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
2544
2545        let delegates = cx.update(|cx| repository.read(cx).askpass_delegates.clone());
2546        let Some(mut askpass) = delegates.lock().remove(&envelope.payload.askpass_id) else {
2547            debug_panic!("no askpass found");
2548            anyhow::bail!("no askpass found");
2549        };
2550
2551        let response = askpass
2552            .ask_password(envelope.payload.prompt)
2553            .await
2554            .ok_or_else(|| anyhow::anyhow!("askpass cancelled"))?;
2555
2556        delegates
2557            .lock()
2558            .insert(envelope.payload.askpass_id, askpass);
2559
2560        // In fact, we don't quite know what we're doing here, as we're sending askpass password unencrypted, but..
2561        Ok(proto::AskPassResponse {
2562            response: response.decrypt(IKnowWhatIAmDoingAndIHaveReadTheDocs)?,
2563        })
2564    }
2565
2566    async fn handle_check_for_pushed_commits(
2567        this: Entity<Self>,
2568        envelope: TypedEnvelope<proto::CheckForPushedCommits>,
2569        mut cx: AsyncApp,
2570    ) -> Result<proto::CheckForPushedCommitsResponse> {
2571        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2572        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2573
2574        let branches = repository_handle
2575            .update(&mut cx, |repository_handle, _| {
2576                repository_handle.check_for_pushed_commits()
2577            })
2578            .await??;
2579        Ok(proto::CheckForPushedCommitsResponse {
2580            pushed_to: branches
2581                .into_iter()
2582                .map(|commit| commit.to_string())
2583                .collect(),
2584        })
2585    }
2586
2587    async fn handle_git_diff(
2588        this: Entity<Self>,
2589        envelope: TypedEnvelope<proto::GitDiff>,
2590        mut cx: AsyncApp,
2591    ) -> Result<proto::GitDiffResponse> {
2592        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2593        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2594        let diff_type = match envelope.payload.diff_type() {
2595            proto::git_diff::DiffType::HeadToIndex => DiffType::HeadToIndex,
2596            proto::git_diff::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
2597        };
2598
2599        let mut diff = repository_handle
2600            .update(&mut cx, |repository_handle, cx| {
2601                repository_handle.diff(diff_type, cx)
2602            })
2603            .await??;
2604        const ONE_MB: usize = 1_000_000;
2605        if diff.len() > ONE_MB {
2606            diff = diff.chars().take(ONE_MB).collect()
2607        }
2608
2609        Ok(proto::GitDiffResponse { diff })
2610    }
2611
2612    async fn handle_tree_diff(
2613        this: Entity<Self>,
2614        request: TypedEnvelope<proto::GetTreeDiff>,
2615        mut cx: AsyncApp,
2616    ) -> Result<proto::GetTreeDiffResponse> {
2617        let repository_id = RepositoryId(request.payload.repository_id);
2618        let diff_type = if request.payload.is_merge {
2619            DiffTreeType::MergeBase {
2620                base: request.payload.base.into(),
2621                head: request.payload.head.into(),
2622            }
2623        } else {
2624            DiffTreeType::Since {
2625                base: request.payload.base.into(),
2626                head: request.payload.head.into(),
2627            }
2628        };
2629
2630        let diff = this
2631            .update(&mut cx, |this, cx| {
2632                let repository = this.repositories().get(&repository_id)?;
2633                Some(repository.update(cx, |repo, cx| repo.diff_tree(diff_type, cx)))
2634            })
2635            .context("missing repository")?
2636            .await??;
2637
2638        Ok(proto::GetTreeDiffResponse {
2639            entries: diff
2640                .entries
2641                .into_iter()
2642                .map(|(path, status)| proto::TreeDiffStatus {
2643                    path: path.as_ref().to_proto(),
2644                    status: match status {
2645                        TreeDiffStatus::Added {} => proto::tree_diff_status::Status::Added.into(),
2646                        TreeDiffStatus::Modified { .. } => {
2647                            proto::tree_diff_status::Status::Modified.into()
2648                        }
2649                        TreeDiffStatus::Deleted { .. } => {
2650                            proto::tree_diff_status::Status::Deleted.into()
2651                        }
2652                    },
2653                    oid: match status {
2654                        TreeDiffStatus::Deleted { old } | TreeDiffStatus::Modified { old } => {
2655                            Some(old.to_string())
2656                        }
2657                        TreeDiffStatus::Added => None,
2658                    },
2659                })
2660                .collect(),
2661        })
2662    }
2663
2664    async fn handle_get_blob_content(
2665        this: Entity<Self>,
2666        request: TypedEnvelope<proto::GetBlobContent>,
2667        mut cx: AsyncApp,
2668    ) -> Result<proto::GetBlobContentResponse> {
2669        let oid = git::Oid::from_str(&request.payload.oid)?;
2670        let repository_id = RepositoryId(request.payload.repository_id);
2671        let content = this
2672            .update(&mut cx, |this, cx| {
2673                let repository = this.repositories().get(&repository_id)?;
2674                Some(repository.update(cx, |repo, cx| repo.load_blob_content(oid, cx)))
2675            })
2676            .context("missing repository")?
2677            .await?;
2678        Ok(proto::GetBlobContentResponse { content })
2679    }
2680
2681    async fn handle_open_unstaged_diff(
2682        this: Entity<Self>,
2683        request: TypedEnvelope<proto::OpenUnstagedDiff>,
2684        mut cx: AsyncApp,
2685    ) -> Result<proto::OpenUnstagedDiffResponse> {
2686        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2687        let diff = this
2688            .update(&mut cx, |this, cx| {
2689                let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2690                Some(this.open_unstaged_diff(buffer, cx))
2691            })
2692            .context("missing buffer")?
2693            .await?;
2694        this.update(&mut cx, |this, _| {
2695            let shared_diffs = this
2696                .shared_diffs
2697                .entry(request.original_sender_id.unwrap_or(request.sender_id))
2698                .or_default();
2699            shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone());
2700        });
2701        let staged_text = diff.read_with(&cx, |diff, cx| diff.base_text_string(cx));
2702        Ok(proto::OpenUnstagedDiffResponse { staged_text })
2703    }
2704
2705    async fn handle_open_uncommitted_diff(
2706        this: Entity<Self>,
2707        request: TypedEnvelope<proto::OpenUncommittedDiff>,
2708        mut cx: AsyncApp,
2709    ) -> Result<proto::OpenUncommittedDiffResponse> {
2710        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2711        let diff = this
2712            .update(&mut cx, |this, cx| {
2713                let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2714                Some(this.open_uncommitted_diff(buffer, cx))
2715            })
2716            .context("missing buffer")?
2717            .await?;
2718        this.update(&mut cx, |this, _| {
2719            let shared_diffs = this
2720                .shared_diffs
2721                .entry(request.original_sender_id.unwrap_or(request.sender_id))
2722                .or_default();
2723            shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone());
2724        });
2725        Ok(diff.read_with(&cx, |diff, cx| {
2726            use proto::open_uncommitted_diff_response::Mode;
2727
2728            let unstaged_diff = diff.secondary_diff();
2729            let index_snapshot = unstaged_diff.and_then(|diff| {
2730                let diff = diff.read(cx);
2731                diff.base_text_exists().then(|| diff.base_text(cx))
2732            });
2733
2734            let mode;
2735            let staged_text;
2736            let committed_text;
2737            if diff.base_text_exists() {
2738                let committed_snapshot = diff.base_text(cx);
2739                committed_text = Some(committed_snapshot.text());
2740                if let Some(index_text) = index_snapshot {
2741                    if index_text.remote_id() == committed_snapshot.remote_id() {
2742                        mode = Mode::IndexMatchesHead;
2743                        staged_text = None;
2744                    } else {
2745                        mode = Mode::IndexAndHead;
2746                        staged_text = Some(index_text.text());
2747                    }
2748                } else {
2749                    mode = Mode::IndexAndHead;
2750                    staged_text = None;
2751                }
2752            } else {
2753                mode = Mode::IndexAndHead;
2754                committed_text = None;
2755                staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2756            }
2757
2758            proto::OpenUncommittedDiffResponse {
2759                committed_text,
2760                staged_text,
2761                mode: mode.into(),
2762            }
2763        }))
2764    }
2765
2766    async fn handle_update_diff_bases(
2767        this: Entity<Self>,
2768        request: TypedEnvelope<proto::UpdateDiffBases>,
2769        mut cx: AsyncApp,
2770    ) -> Result<()> {
2771        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2772        this.update(&mut cx, |this, cx| {
2773            if let Some(diff_state) = this.diffs.get_mut(&buffer_id)
2774                && let Some(buffer) = this.buffer_store.read(cx).get(buffer_id)
2775            {
2776                let buffer = buffer.read(cx).text_snapshot();
2777                diff_state.update(cx, |diff_state, cx| {
2778                    diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2779                })
2780            }
2781        });
2782        Ok(())
2783    }
2784
2785    async fn handle_blame_buffer(
2786        this: Entity<Self>,
2787        envelope: TypedEnvelope<proto::BlameBuffer>,
2788        mut cx: AsyncApp,
2789    ) -> Result<proto::BlameBufferResponse> {
2790        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2791        let version = deserialize_version(&envelope.payload.version);
2792        let buffer = this.read_with(&cx, |this, cx| {
2793            this.buffer_store.read(cx).get_existing(buffer_id)
2794        })?;
2795        buffer
2796            .update(&mut cx, |buffer, _| {
2797                buffer.wait_for_version(version.clone())
2798            })
2799            .await?;
2800        let blame = this
2801            .update(&mut cx, |this, cx| {
2802                this.blame_buffer(&buffer, Some(version), cx)
2803            })
2804            .await?;
2805        Ok(serialize_blame_buffer_response(blame))
2806    }
2807
2808    async fn handle_get_permalink_to_line(
2809        this: Entity<Self>,
2810        envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2811        mut cx: AsyncApp,
2812    ) -> Result<proto::GetPermalinkToLineResponse> {
2813        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2814        // let version = deserialize_version(&envelope.payload.version);
2815        let selection = {
2816            let proto_selection = envelope
2817                .payload
2818                .selection
2819                .context("no selection to get permalink for defined")?;
2820            proto_selection.start as u32..proto_selection.end as u32
2821        };
2822        let buffer = this.read_with(&cx, |this, cx| {
2823            this.buffer_store.read(cx).get_existing(buffer_id)
2824        })?;
2825        let permalink = this
2826            .update(&mut cx, |this, cx| {
2827                this.get_permalink_to_line(&buffer, selection, cx)
2828            })
2829            .await?;
2830        Ok(proto::GetPermalinkToLineResponse {
2831            permalink: permalink.to_string(),
2832        })
2833    }
2834
2835    fn repository_for_request(
2836        this: &Entity<Self>,
2837        id: RepositoryId,
2838        cx: &mut AsyncApp,
2839    ) -> Result<Entity<Repository>> {
2840        this.read_with(cx, |this, _| {
2841            this.repositories
2842                .get(&id)
2843                .context("missing repository handle")
2844                .cloned()
2845        })
2846    }
2847
2848    pub fn repo_snapshots(&self, cx: &App) -> HashMap<RepositoryId, RepositorySnapshot> {
2849        self.repositories
2850            .iter()
2851            .map(|(id, repo)| (*id, repo.read(cx).snapshot.clone()))
2852            .collect()
2853    }
2854
2855    fn process_updated_entries(
2856        &self,
2857        worktree: &Entity<Worktree>,
2858        updated_entries: &[(Arc<RelPath>, ProjectEntryId, PathChange)],
2859        cx: &mut App,
2860    ) -> Task<HashMap<Entity<Repository>, Vec<RepoPath>>> {
2861        let path_style = worktree.read(cx).path_style();
2862        let mut repo_paths = self
2863            .repositories
2864            .values()
2865            .map(|repo| (repo.read(cx).work_directory_abs_path.clone(), repo.clone()))
2866            .collect::<Vec<_>>();
2867        let mut entries: Vec<_> = updated_entries
2868            .iter()
2869            .map(|(path, _, _)| path.clone())
2870            .collect();
2871        entries.sort();
2872        let worktree = worktree.read(cx);
2873
2874        let entries = entries
2875            .into_iter()
2876            .map(|path| worktree.absolutize(&path))
2877            .collect::<Arc<[_]>>();
2878
2879        let executor = cx.background_executor().clone();
2880        cx.background_executor().spawn(async move {
2881            repo_paths.sort_by(|lhs, rhs| lhs.0.cmp(&rhs.0));
2882            let mut paths_by_git_repo = HashMap::<_, Vec<_>>::default();
2883            let mut tasks = FuturesOrdered::new();
2884            for (repo_path, repo) in repo_paths.into_iter().rev() {
2885                let entries = entries.clone();
2886                let task = executor.spawn(async move {
2887                    // Find all repository paths that belong to this repo
2888                    let mut ix = entries.partition_point(|path| path < &*repo_path);
2889                    if ix == entries.len() {
2890                        return None;
2891                    };
2892
2893                    let mut paths = Vec::new();
2894                    // All paths prefixed by a given repo will constitute a continuous range.
2895                    while let Some(path) = entries.get(ix)
2896                        && let Some(repo_path) = RepositorySnapshot::abs_path_to_repo_path_inner(
2897                            &repo_path, path, path_style,
2898                        )
2899                    {
2900                        paths.push((repo_path, ix));
2901                        ix += 1;
2902                    }
2903                    if paths.is_empty() {
2904                        None
2905                    } else {
2906                        Some((repo, paths))
2907                    }
2908                });
2909                tasks.push_back(task);
2910            }
2911
2912            // Now, let's filter out the "duplicate" entries that were processed by multiple distinct repos.
2913            let mut path_was_used = vec![false; entries.len()];
2914            let tasks = tasks.collect::<Vec<_>>().await;
2915            // Process tasks from the back: iterating backwards allows us to see more-specific paths first.
2916            // We always want to assign a path to it's innermost repository.
2917            for t in tasks {
2918                let Some((repo, paths)) = t else {
2919                    continue;
2920                };
2921                let entry = paths_by_git_repo.entry(repo).or_default();
2922                for (repo_path, ix) in paths {
2923                    if path_was_used[ix] {
2924                        continue;
2925                    }
2926                    path_was_used[ix] = true;
2927                    entry.push(repo_path);
2928                }
2929            }
2930
2931            paths_by_git_repo
2932        })
2933    }
2934}
2935
2936impl BufferGitState {
2937    fn new(_git_store: WeakEntity<GitStore>) -> Self {
2938        Self {
2939            unstaged_diff: Default::default(),
2940            uncommitted_diff: Default::default(),
2941            recalculate_diff_task: Default::default(),
2942            language: Default::default(),
2943            language_registry: Default::default(),
2944            recalculating_tx: postage::watch::channel_with(false).0,
2945            hunk_staging_operation_count: 0,
2946            hunk_staging_operation_count_as_of_write: 0,
2947            head_text: Default::default(),
2948            index_text: Default::default(),
2949            head_changed: Default::default(),
2950            index_changed: Default::default(),
2951            language_changed: Default::default(),
2952            conflict_updated_futures: Default::default(),
2953            conflict_set: Default::default(),
2954            reparse_conflict_markers_task: Default::default(),
2955        }
2956    }
2957
2958    #[ztracing::instrument(skip_all)]
2959    fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
2960        self.language = buffer.read(cx).language().cloned();
2961        self.language_changed = true;
2962        let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
2963    }
2964
2965    fn reparse_conflict_markers(
2966        &mut self,
2967        buffer: text::BufferSnapshot,
2968        cx: &mut Context<Self>,
2969    ) -> oneshot::Receiver<()> {
2970        let (tx, rx) = oneshot::channel();
2971
2972        let Some(conflict_set) = self
2973            .conflict_set
2974            .as_ref()
2975            .and_then(|conflict_set| conflict_set.upgrade())
2976        else {
2977            return rx;
2978        };
2979
2980        let old_snapshot = conflict_set.read_with(cx, |conflict_set, _| {
2981            if conflict_set.has_conflict {
2982                Some(conflict_set.snapshot())
2983            } else {
2984                None
2985            }
2986        });
2987
2988        if let Some(old_snapshot) = old_snapshot {
2989            self.conflict_updated_futures.push(tx);
2990            self.reparse_conflict_markers_task = Some(cx.spawn(async move |this, cx| {
2991                let (snapshot, changed_range) = cx
2992                    .background_spawn(async move {
2993                        let new_snapshot = ConflictSet::parse(&buffer);
2994                        let changed_range = old_snapshot.compare(&new_snapshot, &buffer);
2995                        (new_snapshot, changed_range)
2996                    })
2997                    .await;
2998                this.update(cx, |this, cx| {
2999                    if let Some(conflict_set) = &this.conflict_set {
3000                        conflict_set
3001                            .update(cx, |conflict_set, cx| {
3002                                conflict_set.set_snapshot(snapshot, changed_range, cx);
3003                            })
3004                            .ok();
3005                    }
3006                    let futures = std::mem::take(&mut this.conflict_updated_futures);
3007                    for tx in futures {
3008                        tx.send(()).ok();
3009                    }
3010                })
3011            }))
3012        }
3013
3014        rx
3015    }
3016
3017    fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
3018        self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
3019    }
3020
3021    fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
3022        self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
3023    }
3024
3025    fn handle_base_texts_updated(
3026        &mut self,
3027        buffer: text::BufferSnapshot,
3028        message: proto::UpdateDiffBases,
3029        cx: &mut Context<Self>,
3030    ) {
3031        use proto::update_diff_bases::Mode;
3032
3033        let Some(mode) = Mode::from_i32(message.mode) else {
3034            return;
3035        };
3036
3037        let diff_bases_change = match mode {
3038            Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
3039            Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
3040            Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
3041            Mode::IndexAndHead => DiffBasesChange::SetEach {
3042                index: message.staged_text,
3043                head: message.committed_text,
3044            },
3045        };
3046
3047        self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
3048    }
3049
3050    pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
3051        if *self.recalculating_tx.borrow() {
3052            let mut rx = self.recalculating_tx.subscribe();
3053            Some(async move {
3054                loop {
3055                    let is_recalculating = rx.recv().await;
3056                    if is_recalculating != Some(true) {
3057                        break;
3058                    }
3059                }
3060            })
3061        } else {
3062            None
3063        }
3064    }
3065
3066    fn diff_bases_changed(
3067        &mut self,
3068        buffer: text::BufferSnapshot,
3069        diff_bases_change: Option<DiffBasesChange>,
3070        cx: &mut Context<Self>,
3071    ) {
3072        match diff_bases_change {
3073            Some(DiffBasesChange::SetIndex(index)) => {
3074                self.index_text = index.map(|mut index| {
3075                    text::LineEnding::normalize(&mut index);
3076                    Arc::from(index.as_str())
3077                });
3078                self.index_changed = true;
3079            }
3080            Some(DiffBasesChange::SetHead(head)) => {
3081                self.head_text = head.map(|mut head| {
3082                    text::LineEnding::normalize(&mut head);
3083                    Arc::from(head.as_str())
3084                });
3085                self.head_changed = true;
3086            }
3087            Some(DiffBasesChange::SetBoth(text)) => {
3088                let text = text.map(|mut text| {
3089                    text::LineEnding::normalize(&mut text);
3090                    Arc::from(text.as_str())
3091                });
3092                self.head_text = text.clone();
3093                self.index_text = text;
3094                self.head_changed = true;
3095                self.index_changed = true;
3096            }
3097            Some(DiffBasesChange::SetEach { index, head }) => {
3098                self.index_text = index.map(|mut index| {
3099                    text::LineEnding::normalize(&mut index);
3100                    Arc::from(index.as_str())
3101                });
3102                self.index_changed = true;
3103                self.head_text = head.map(|mut head| {
3104                    text::LineEnding::normalize(&mut head);
3105                    Arc::from(head.as_str())
3106                });
3107                self.head_changed = true;
3108            }
3109            None => {}
3110        }
3111
3112        self.recalculate_diffs(buffer, cx)
3113    }
3114
3115    #[ztracing::instrument(skip_all)]
3116    fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
3117        *self.recalculating_tx.borrow_mut() = true;
3118
3119        let language = self.language.clone();
3120        let language_registry = self.language_registry.clone();
3121        let unstaged_diff = self.unstaged_diff();
3122        let uncommitted_diff = self.uncommitted_diff();
3123        let head = self.head_text.clone();
3124        let index = self.index_text.clone();
3125        let index_changed = self.index_changed;
3126        let head_changed = self.head_changed;
3127        let language_changed = self.language_changed;
3128        let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_write;
3129        let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
3130            (Some(index), Some(head)) => Arc::ptr_eq(index, head),
3131            (None, None) => true,
3132            _ => false,
3133        };
3134        self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
3135            log::debug!(
3136                "start recalculating diffs for buffer {}",
3137                buffer.remote_id()
3138            );
3139
3140            let mut new_unstaged_diff = None;
3141            if let Some(unstaged_diff) = &unstaged_diff {
3142                new_unstaged_diff = Some(
3143                    cx.update(|cx| {
3144                        unstaged_diff.read(cx).update_diff(
3145                            buffer.clone(),
3146                            index,
3147                            index_changed.then_some(false),
3148                            language.clone(),
3149                            cx,
3150                        )
3151                    })
3152                    .await,
3153                );
3154            }
3155
3156            // Dropping BufferDiff can be expensive, so yield back to the event loop
3157            // for a bit
3158            yield_now().await;
3159
3160            let mut new_uncommitted_diff = None;
3161            if let Some(uncommitted_diff) = &uncommitted_diff {
3162                new_uncommitted_diff = if index_matches_head {
3163                    new_unstaged_diff.clone()
3164                } else {
3165                    Some(
3166                        cx.update(|cx| {
3167                            uncommitted_diff.read(cx).update_diff(
3168                                buffer.clone(),
3169                                head,
3170                                head_changed.then_some(true),
3171                                language.clone(),
3172                                cx,
3173                            )
3174                        })
3175                        .await,
3176                    )
3177                }
3178            }
3179
3180            // Dropping BufferDiff can be expensive, so yield back to the event loop
3181            // for a bit
3182            yield_now().await;
3183
3184            let cancel = this.update(cx, |this, _| {
3185                // This checks whether all pending stage/unstage operations
3186                // have quiesced (i.e. both the corresponding write and the
3187                // read of that write have completed). If not, then we cancel
3188                // this recalculation attempt to avoid invalidating pending
3189                // state too quickly; another recalculation will come along
3190                // later and clear the pending state once the state of the index has settled.
3191                if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
3192                    *this.recalculating_tx.borrow_mut() = false;
3193                    true
3194                } else {
3195                    false
3196                }
3197            })?;
3198            if cancel {
3199                log::debug!(
3200                    concat!(
3201                        "aborting recalculating diffs for buffer {}",
3202                        "due to subsequent hunk operations",
3203                    ),
3204                    buffer.remote_id()
3205                );
3206                return Ok(());
3207            }
3208
3209            let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
3210                unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
3211            {
3212                let task = unstaged_diff.update(cx, |diff, cx| {
3213                    if language_changed {
3214                        diff.language_changed(language.clone(), language_registry.clone(), cx);
3215                    }
3216                    diff.set_snapshot(new_unstaged_diff, &buffer, cx)
3217                });
3218                Some(task.await)
3219            } else {
3220                None
3221            };
3222
3223            yield_now().await;
3224
3225            if let Some((uncommitted_diff, new_uncommitted_diff)) =
3226                uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
3227            {
3228                uncommitted_diff
3229                    .update(cx, |diff, cx| {
3230                        if language_changed {
3231                            diff.language_changed(language, language_registry, cx);
3232                        }
3233                        diff.set_snapshot_with_secondary(
3234                            new_uncommitted_diff,
3235                            &buffer,
3236                            unstaged_changed_range.flatten(),
3237                            true,
3238                            cx,
3239                        )
3240                    })
3241                    .await;
3242            }
3243
3244            log::debug!(
3245                "finished recalculating diffs for buffer {}",
3246                buffer.remote_id()
3247            );
3248
3249            if let Some(this) = this.upgrade() {
3250                this.update(cx, |this, _| {
3251                    this.index_changed = false;
3252                    this.head_changed = false;
3253                    this.language_changed = false;
3254                    *this.recalculating_tx.borrow_mut() = false;
3255                });
3256            }
3257
3258            Ok(())
3259        }));
3260    }
3261}
3262
3263fn make_remote_delegate(
3264    this: Entity<GitStore>,
3265    project_id: u64,
3266    repository_id: RepositoryId,
3267    askpass_id: u64,
3268    cx: &mut AsyncApp,
3269) -> AskPassDelegate {
3270    AskPassDelegate::new(cx, move |prompt, tx, cx| {
3271        this.update(cx, |this, cx| {
3272            let Some((client, _)) = this.downstream_client() else {
3273                return;
3274            };
3275            let response = client.request(proto::AskPassRequest {
3276                project_id,
3277                repository_id: repository_id.to_proto(),
3278                askpass_id,
3279                prompt,
3280            });
3281            cx.spawn(async move |_, _| {
3282                let mut response = response.await?.response;
3283                tx.send(EncryptedPassword::try_from(response.as_ref())?)
3284                    .ok();
3285                response.zeroize();
3286                anyhow::Ok(())
3287            })
3288            .detach_and_log_err(cx);
3289        });
3290    })
3291}
3292
3293impl RepositoryId {
3294    pub fn to_proto(self) -> u64 {
3295        self.0
3296    }
3297
3298    pub fn from_proto(id: u64) -> Self {
3299        RepositoryId(id)
3300    }
3301}
3302
3303impl RepositorySnapshot {
3304    fn empty(id: RepositoryId, work_directory_abs_path: Arc<Path>, path_style: PathStyle) -> Self {
3305        Self {
3306            id,
3307            statuses_by_path: Default::default(),
3308            work_directory_abs_path,
3309            branch: None,
3310            head_commit: None,
3311            scan_id: 0,
3312            merge: Default::default(),
3313            remote_origin_url: None,
3314            remote_upstream_url: None,
3315            stash_entries: Default::default(),
3316            path_style,
3317        }
3318    }
3319
3320    fn initial_update(&self, project_id: u64) -> proto::UpdateRepository {
3321        proto::UpdateRepository {
3322            branch_summary: self.branch.as_ref().map(branch_to_proto),
3323            head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
3324            updated_statuses: self
3325                .statuses_by_path
3326                .iter()
3327                .map(|entry| entry.to_proto())
3328                .collect(),
3329            removed_statuses: Default::default(),
3330            current_merge_conflicts: self
3331                .merge
3332                .conflicted_paths
3333                .iter()
3334                .map(|repo_path| repo_path.to_proto())
3335                .collect(),
3336            merge_message: self.merge.message.as_ref().map(|msg| msg.to_string()),
3337            project_id,
3338            id: self.id.to_proto(),
3339            abs_path: self.work_directory_abs_path.to_string_lossy().into_owned(),
3340            entry_ids: vec![self.id.to_proto()],
3341            scan_id: self.scan_id,
3342            is_last_update: true,
3343            stash_entries: self
3344                .stash_entries
3345                .entries
3346                .iter()
3347                .map(stash_to_proto)
3348                .collect(),
3349            remote_upstream_url: self.remote_upstream_url.clone(),
3350            remote_origin_url: self.remote_origin_url.clone(),
3351        }
3352    }
3353
3354    fn build_update(&self, old: &Self, project_id: u64) -> proto::UpdateRepository {
3355        let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
3356        let mut removed_statuses: Vec<String> = Vec::new();
3357
3358        let mut new_statuses = self.statuses_by_path.iter().peekable();
3359        let mut old_statuses = old.statuses_by_path.iter().peekable();
3360
3361        let mut current_new_entry = new_statuses.next();
3362        let mut current_old_entry = old_statuses.next();
3363        loop {
3364            match (current_new_entry, current_old_entry) {
3365                (Some(new_entry), Some(old_entry)) => {
3366                    match new_entry.repo_path.cmp(&old_entry.repo_path) {
3367                        Ordering::Less => {
3368                            updated_statuses.push(new_entry.to_proto());
3369                            current_new_entry = new_statuses.next();
3370                        }
3371                        Ordering::Equal => {
3372                            if new_entry.status != old_entry.status {
3373                                updated_statuses.push(new_entry.to_proto());
3374                            }
3375                            current_old_entry = old_statuses.next();
3376                            current_new_entry = new_statuses.next();
3377                        }
3378                        Ordering::Greater => {
3379                            removed_statuses.push(old_entry.repo_path.to_proto());
3380                            current_old_entry = old_statuses.next();
3381                        }
3382                    }
3383                }
3384                (None, Some(old_entry)) => {
3385                    removed_statuses.push(old_entry.repo_path.to_proto());
3386                    current_old_entry = old_statuses.next();
3387                }
3388                (Some(new_entry), None) => {
3389                    updated_statuses.push(new_entry.to_proto());
3390                    current_new_entry = new_statuses.next();
3391                }
3392                (None, None) => break,
3393            }
3394        }
3395
3396        proto::UpdateRepository {
3397            branch_summary: self.branch.as_ref().map(branch_to_proto),
3398            head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
3399            updated_statuses,
3400            removed_statuses,
3401            current_merge_conflicts: self
3402                .merge
3403                .conflicted_paths
3404                .iter()
3405                .map(|path| path.to_proto())
3406                .collect(),
3407            merge_message: self.merge.message.as_ref().map(|msg| msg.to_string()),
3408            project_id,
3409            id: self.id.to_proto(),
3410            abs_path: self.work_directory_abs_path.to_string_lossy().into_owned(),
3411            entry_ids: vec![],
3412            scan_id: self.scan_id,
3413            is_last_update: true,
3414            stash_entries: self
3415                .stash_entries
3416                .entries
3417                .iter()
3418                .map(stash_to_proto)
3419                .collect(),
3420            remote_upstream_url: self.remote_upstream_url.clone(),
3421            remote_origin_url: self.remote_origin_url.clone(),
3422        }
3423    }
3424
3425    pub fn status(&self) -> impl Iterator<Item = StatusEntry> + '_ {
3426        self.statuses_by_path.iter().cloned()
3427    }
3428
3429    pub fn status_summary(&self) -> GitSummary {
3430        self.statuses_by_path.summary().item_summary
3431    }
3432
3433    pub fn status_for_path(&self, path: &RepoPath) -> Option<StatusEntry> {
3434        self.statuses_by_path
3435            .get(&PathKey(path.as_ref().clone()), ())
3436            .cloned()
3437    }
3438
3439    pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
3440        Self::abs_path_to_repo_path_inner(&self.work_directory_abs_path, abs_path, self.path_style)
3441    }
3442
3443    fn repo_path_to_abs_path(&self, repo_path: &RepoPath) -> PathBuf {
3444        self.path_style
3445            .join(&self.work_directory_abs_path, repo_path.as_std_path())
3446            .unwrap()
3447            .into()
3448    }
3449
3450    #[inline]
3451    fn abs_path_to_repo_path_inner(
3452        work_directory_abs_path: &Path,
3453        abs_path: &Path,
3454        path_style: PathStyle,
3455    ) -> Option<RepoPath> {
3456        let rel_path = path_style.strip_prefix(abs_path, work_directory_abs_path)?;
3457        Some(RepoPath::from_rel_path(&rel_path))
3458    }
3459
3460    pub fn had_conflict_on_last_merge_head_change(&self, repo_path: &RepoPath) -> bool {
3461        self.merge.conflicted_paths.contains(repo_path)
3462    }
3463
3464    pub fn has_conflict(&self, repo_path: &RepoPath) -> bool {
3465        let had_conflict_on_last_merge_head_change =
3466            self.merge.conflicted_paths.contains(repo_path);
3467        let has_conflict_currently = self
3468            .status_for_path(repo_path)
3469            .is_some_and(|entry| entry.status.is_conflicted());
3470        had_conflict_on_last_merge_head_change || has_conflict_currently
3471    }
3472
3473    /// This is the name that will be displayed in the repository selector for this repository.
3474    pub fn display_name(&self) -> SharedString {
3475        self.work_directory_abs_path
3476            .file_name()
3477            .unwrap_or_default()
3478            .to_string_lossy()
3479            .to_string()
3480            .into()
3481    }
3482}
3483
3484pub fn stash_to_proto(entry: &StashEntry) -> proto::StashEntry {
3485    proto::StashEntry {
3486        oid: entry.oid.as_bytes().to_vec(),
3487        message: entry.message.clone(),
3488        branch: entry.branch.clone(),
3489        index: entry.index as u64,
3490        timestamp: entry.timestamp,
3491    }
3492}
3493
3494pub fn proto_to_stash(entry: &proto::StashEntry) -> Result<StashEntry> {
3495    Ok(StashEntry {
3496        oid: Oid::from_bytes(&entry.oid)?,
3497        message: entry.message.clone(),
3498        index: entry.index as usize,
3499        branch: entry.branch.clone(),
3500        timestamp: entry.timestamp,
3501    })
3502}
3503
3504impl MergeDetails {
3505    async fn load(
3506        backend: &Arc<dyn GitRepository>,
3507        status: &SumTree<StatusEntry>,
3508        prev_snapshot: &RepositorySnapshot,
3509    ) -> Result<(MergeDetails, bool)> {
3510        log::debug!("load merge details");
3511        let message = backend.merge_message().await;
3512        let heads = backend
3513            .revparse_batch(vec![
3514                "MERGE_HEAD".into(),
3515                "CHERRY_PICK_HEAD".into(),
3516                "REBASE_HEAD".into(),
3517                "REVERT_HEAD".into(),
3518                "APPLY_HEAD".into(),
3519            ])
3520            .await
3521            .log_err()
3522            .unwrap_or_default()
3523            .into_iter()
3524            .map(|opt| opt.map(SharedString::from))
3525            .collect::<Vec<_>>();
3526        let merge_heads_changed = heads != prev_snapshot.merge.heads;
3527        let conflicted_paths = if merge_heads_changed {
3528            let current_conflicted_paths = TreeSet::from_ordered_entries(
3529                status
3530                    .iter()
3531                    .filter(|entry| entry.status.is_conflicted())
3532                    .map(|entry| entry.repo_path.clone()),
3533            );
3534
3535            // It can happen that we run a scan while a lengthy merge is in progress
3536            // that will eventually result in conflicts, but before those conflicts
3537            // are reported by `git status`. Since for the moment we only care about
3538            // the merge heads state for the purposes of tracking conflicts, don't update
3539            // this state until we see some conflicts.
3540            if heads.iter().any(Option::is_some)
3541                && !prev_snapshot.merge.heads.iter().any(Option::is_some)
3542                && current_conflicted_paths.is_empty()
3543            {
3544                log::debug!("not updating merge heads because no conflicts found");
3545                return Ok((
3546                    MergeDetails {
3547                        message: message.map(SharedString::from),
3548                        ..prev_snapshot.merge.clone()
3549                    },
3550                    false,
3551                ));
3552            }
3553
3554            current_conflicted_paths
3555        } else {
3556            prev_snapshot.merge.conflicted_paths.clone()
3557        };
3558        let details = MergeDetails {
3559            conflicted_paths,
3560            message: message.map(SharedString::from),
3561            heads,
3562        };
3563        Ok((details, merge_heads_changed))
3564    }
3565}
3566
3567impl Repository {
3568    pub fn snapshot(&self) -> RepositorySnapshot {
3569        self.snapshot.clone()
3570    }
3571
3572    pub fn pending_ops(&self) -> impl Iterator<Item = PendingOps> + '_ {
3573        self.pending_ops.iter().cloned()
3574    }
3575
3576    pub fn pending_ops_summary(&self) -> PathSummary<PendingOpsSummary> {
3577        self.pending_ops.summary().clone()
3578    }
3579
3580    pub fn pending_ops_for_path(&self, path: &RepoPath) -> Option<PendingOps> {
3581        self.pending_ops
3582            .get(&PathKey(path.as_ref().clone()), ())
3583            .cloned()
3584    }
3585
3586    fn local(
3587        id: RepositoryId,
3588        work_directory_abs_path: Arc<Path>,
3589        dot_git_abs_path: Arc<Path>,
3590        project_environment: WeakEntity<ProjectEnvironment>,
3591        fs: Arc<dyn Fs>,
3592        git_store: WeakEntity<GitStore>,
3593        cx: &mut Context<Self>,
3594    ) -> Self {
3595        let snapshot =
3596            RepositorySnapshot::empty(id, work_directory_abs_path.clone(), PathStyle::local());
3597        let state = cx
3598            .spawn(async move |_, cx| {
3599                LocalRepositoryState::new(
3600                    work_directory_abs_path,
3601                    dot_git_abs_path,
3602                    project_environment,
3603                    fs,
3604                    cx,
3605                )
3606                .await
3607                .map_err(|err| err.to_string())
3608            })
3609            .shared();
3610        let job_sender = Repository::spawn_local_git_worker(state.clone(), cx);
3611        let state = cx
3612            .spawn(async move |_, _| {
3613                let state = state.await?;
3614                Ok(RepositoryState::Local(state))
3615            })
3616            .shared();
3617
3618        cx.subscribe_self(|this, event: &RepositoryEvent, _| match event {
3619            RepositoryEvent::BranchChanged | RepositoryEvent::MergeHeadsChanged => {
3620                this.initial_graph_data.clear();
3621            }
3622            _ => {}
3623        })
3624        .detach();
3625
3626        Repository {
3627            this: cx.weak_entity(),
3628            git_store,
3629            snapshot,
3630            pending_ops: Default::default(),
3631            repository_state: state,
3632            commit_message_buffer: None,
3633            askpass_delegates: Default::default(),
3634            paths_needing_status_update: Default::default(),
3635            latest_askpass_id: 0,
3636            job_sender,
3637            job_id: 0,
3638            active_jobs: Default::default(),
3639            initial_graph_data: Default::default(),
3640            commit_data: Default::default(),
3641            graph_commit_data_handler: GraphCommitHandlerState::Closed,
3642        }
3643    }
3644
3645    fn remote(
3646        id: RepositoryId,
3647        work_directory_abs_path: Arc<Path>,
3648        path_style: PathStyle,
3649        project_id: ProjectId,
3650        client: AnyProtoClient,
3651        git_store: WeakEntity<GitStore>,
3652        cx: &mut Context<Self>,
3653    ) -> Self {
3654        let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path, path_style);
3655        let repository_state = RemoteRepositoryState { project_id, client };
3656        let job_sender = Self::spawn_remote_git_worker(repository_state.clone(), cx);
3657        let repository_state = Task::ready(Ok(RepositoryState::Remote(repository_state))).shared();
3658        Self {
3659            this: cx.weak_entity(),
3660            snapshot,
3661            commit_message_buffer: None,
3662            git_store,
3663            pending_ops: Default::default(),
3664            paths_needing_status_update: Default::default(),
3665            job_sender,
3666            repository_state,
3667            askpass_delegates: Default::default(),
3668            latest_askpass_id: 0,
3669            active_jobs: Default::default(),
3670            job_id: 0,
3671            initial_graph_data: Default::default(),
3672            commit_data: Default::default(),
3673            graph_commit_data_handler: GraphCommitHandlerState::Closed,
3674        }
3675    }
3676
3677    pub fn git_store(&self) -> Option<Entity<GitStore>> {
3678        self.git_store.upgrade()
3679    }
3680
3681    fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
3682        let this = cx.weak_entity();
3683        let git_store = self.git_store.clone();
3684        let _ = self.send_keyed_job(
3685            Some(GitJobKey::ReloadBufferDiffBases),
3686            None,
3687            |state, mut cx| async move {
3688                let RepositoryState::Local(LocalRepositoryState { backend, .. }) = state else {
3689                    log::error!("tried to recompute diffs for a non-local repository");
3690                    return Ok(());
3691                };
3692
3693                let Some(this) = this.upgrade() else {
3694                    return Ok(());
3695                };
3696
3697                let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
3698                    git_store.update(cx, |git_store, cx| {
3699                        git_store
3700                            .diffs
3701                            .iter()
3702                            .filter_map(|(buffer_id, diff_state)| {
3703                                let buffer_store = git_store.buffer_store.read(cx);
3704                                let buffer = buffer_store.get(*buffer_id)?;
3705                                let file = File::from_dyn(buffer.read(cx).file())?;
3706                                let abs_path = file.worktree.read(cx).absolutize(&file.path);
3707                                let repo_path = this.abs_path_to_repo_path(&abs_path)?;
3708                                log::debug!(
3709                                    "start reload diff bases for repo path {}",
3710                                    repo_path.as_unix_str()
3711                                );
3712                                diff_state.update(cx, |diff_state, _| {
3713                                    let has_unstaged_diff = diff_state
3714                                        .unstaged_diff
3715                                        .as_ref()
3716                                        .is_some_and(|diff| diff.is_upgradable());
3717                                    let has_uncommitted_diff = diff_state
3718                                        .uncommitted_diff
3719                                        .as_ref()
3720                                        .is_some_and(|set| set.is_upgradable());
3721
3722                                    Some((
3723                                        buffer,
3724                                        repo_path,
3725                                        has_unstaged_diff.then(|| diff_state.index_text.clone()),
3726                                        has_uncommitted_diff.then(|| diff_state.head_text.clone()),
3727                                    ))
3728                                })
3729                            })
3730                            .collect::<Vec<_>>()
3731                    })
3732                })?;
3733
3734                let buffer_diff_base_changes = cx
3735                    .background_spawn(async move {
3736                        let mut changes = Vec::new();
3737                        for (buffer, repo_path, current_index_text, current_head_text) in
3738                            &repo_diff_state_updates
3739                        {
3740                            let index_text = if current_index_text.is_some() {
3741                                backend.load_index_text(repo_path.clone()).await
3742                            } else {
3743                                None
3744                            };
3745                            let head_text = if current_head_text.is_some() {
3746                                backend.load_committed_text(repo_path.clone()).await
3747                            } else {
3748                                None
3749                            };
3750
3751                            let change =
3752                                match (current_index_text.as_ref(), current_head_text.as_ref()) {
3753                                    (Some(current_index), Some(current_head)) => {
3754                                        let index_changed =
3755                                            index_text.as_deref() != current_index.as_deref();
3756                                        let head_changed =
3757                                            head_text.as_deref() != current_head.as_deref();
3758                                        if index_changed && head_changed {
3759                                            if index_text == head_text {
3760                                                Some(DiffBasesChange::SetBoth(head_text))
3761                                            } else {
3762                                                Some(DiffBasesChange::SetEach {
3763                                                    index: index_text,
3764                                                    head: head_text,
3765                                                })
3766                                            }
3767                                        } else if index_changed {
3768                                            Some(DiffBasesChange::SetIndex(index_text))
3769                                        } else if head_changed {
3770                                            Some(DiffBasesChange::SetHead(head_text))
3771                                        } else {
3772                                            None
3773                                        }
3774                                    }
3775                                    (Some(current_index), None) => {
3776                                        let index_changed =
3777                                            index_text.as_deref() != current_index.as_deref();
3778                                        index_changed
3779                                            .then_some(DiffBasesChange::SetIndex(index_text))
3780                                    }
3781                                    (None, Some(current_head)) => {
3782                                        let head_changed =
3783                                            head_text.as_deref() != current_head.as_deref();
3784                                        head_changed.then_some(DiffBasesChange::SetHead(head_text))
3785                                    }
3786                                    (None, None) => None,
3787                                };
3788
3789                            changes.push((buffer.clone(), change))
3790                        }
3791                        changes
3792                    })
3793                    .await;
3794
3795                git_store.update(&mut cx, |git_store, cx| {
3796                    for (buffer, diff_bases_change) in buffer_diff_base_changes {
3797                        let buffer_snapshot = buffer.read(cx).text_snapshot();
3798                        let buffer_id = buffer_snapshot.remote_id();
3799                        let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
3800                            continue;
3801                        };
3802
3803                        let downstream_client = git_store.downstream_client();
3804                        diff_state.update(cx, |diff_state, cx| {
3805                            use proto::update_diff_bases::Mode;
3806
3807                            if let Some((diff_bases_change, (client, project_id))) =
3808                                diff_bases_change.clone().zip(downstream_client)
3809                            {
3810                                let (staged_text, committed_text, mode) = match diff_bases_change {
3811                                    DiffBasesChange::SetIndex(index) => {
3812                                        (index, None, Mode::IndexOnly)
3813                                    }
3814                                    DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
3815                                    DiffBasesChange::SetEach { index, head } => {
3816                                        (index, head, Mode::IndexAndHead)
3817                                    }
3818                                    DiffBasesChange::SetBoth(text) => {
3819                                        (None, text, Mode::IndexMatchesHead)
3820                                    }
3821                                };
3822                                client
3823                                    .send(proto::UpdateDiffBases {
3824                                        project_id: project_id.to_proto(),
3825                                        buffer_id: buffer_id.to_proto(),
3826                                        staged_text,
3827                                        committed_text,
3828                                        mode: mode as i32,
3829                                    })
3830                                    .log_err();
3831                            }
3832
3833                            diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
3834                        });
3835                    }
3836                })
3837            },
3838        );
3839    }
3840
3841    pub fn send_job<F, Fut, R>(
3842        &mut self,
3843        status: Option<SharedString>,
3844        job: F,
3845    ) -> oneshot::Receiver<R>
3846    where
3847        F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3848        Fut: Future<Output = R> + 'static,
3849        R: Send + 'static,
3850    {
3851        self.send_keyed_job(None, status, job)
3852    }
3853
3854    fn send_keyed_job<F, Fut, R>(
3855        &mut self,
3856        key: Option<GitJobKey>,
3857        status: Option<SharedString>,
3858        job: F,
3859    ) -> oneshot::Receiver<R>
3860    where
3861        F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3862        Fut: Future<Output = R> + 'static,
3863        R: Send + 'static,
3864    {
3865        let (result_tx, result_rx) = futures::channel::oneshot::channel();
3866        let job_id = post_inc(&mut self.job_id);
3867        let this = self.this.clone();
3868        self.job_sender
3869            .unbounded_send(GitJob {
3870                key,
3871                job: Box::new(move |state, cx: &mut AsyncApp| {
3872                    let job = job(state, cx.clone());
3873                    cx.spawn(async move |cx| {
3874                        if let Some(s) = status.clone() {
3875                            this.update(cx, |this, cx| {
3876                                this.active_jobs.insert(
3877                                    job_id,
3878                                    JobInfo {
3879                                        start: Instant::now(),
3880                                        message: s.clone(),
3881                                    },
3882                                );
3883
3884                                cx.notify();
3885                            })
3886                            .ok();
3887                        }
3888                        let result = job.await;
3889
3890                        this.update(cx, |this, cx| {
3891                            this.active_jobs.remove(&job_id);
3892                            cx.notify();
3893                        })
3894                        .ok();
3895
3896                        result_tx.send(result).ok();
3897                    })
3898                }),
3899            })
3900            .ok();
3901        result_rx
3902    }
3903
3904    pub fn set_as_active_repository(&self, cx: &mut Context<Self>) {
3905        let Some(git_store) = self.git_store.upgrade() else {
3906            return;
3907        };
3908        let entity = cx.entity();
3909        git_store.update(cx, |git_store, cx| {
3910            let Some((&id, _)) = git_store
3911                .repositories
3912                .iter()
3913                .find(|(_, handle)| *handle == &entity)
3914            else {
3915                return;
3916            };
3917            git_store.active_repo_id = Some(id);
3918            cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
3919        });
3920    }
3921
3922    pub fn cached_status(&self) -> impl '_ + Iterator<Item = StatusEntry> {
3923        self.snapshot.status()
3924    }
3925
3926    pub fn cached_stash(&self) -> GitStash {
3927        self.snapshot.stash_entries.clone()
3928    }
3929
3930    pub fn repo_path_to_project_path(&self, path: &RepoPath, cx: &App) -> Option<ProjectPath> {
3931        let git_store = self.git_store.upgrade()?;
3932        let worktree_store = git_store.read(cx).worktree_store.read(cx);
3933        let abs_path = self.snapshot.repo_path_to_abs_path(path);
3934        let abs_path = SanitizedPath::new(&abs_path);
3935        let (worktree, relative_path) = worktree_store.find_worktree(abs_path, cx)?;
3936        Some(ProjectPath {
3937            worktree_id: worktree.read(cx).id(),
3938            path: relative_path,
3939        })
3940    }
3941
3942    pub fn project_path_to_repo_path(&self, path: &ProjectPath, cx: &App) -> Option<RepoPath> {
3943        let git_store = self.git_store.upgrade()?;
3944        let worktree_store = git_store.read(cx).worktree_store.read(cx);
3945        let abs_path = worktree_store.absolutize(path, cx)?;
3946        self.snapshot.abs_path_to_repo_path(&abs_path)
3947    }
3948
3949    pub fn contains_sub_repo(&self, other: &Entity<Self>, cx: &App) -> bool {
3950        other
3951            .read(cx)
3952            .snapshot
3953            .work_directory_abs_path
3954            .starts_with(&self.snapshot.work_directory_abs_path)
3955    }
3956
3957    pub fn open_commit_buffer(
3958        &mut self,
3959        languages: Option<Arc<LanguageRegistry>>,
3960        buffer_store: Entity<BufferStore>,
3961        cx: &mut Context<Self>,
3962    ) -> Task<Result<Entity<Buffer>>> {
3963        let id = self.id;
3964        if let Some(buffer) = self.commit_message_buffer.clone() {
3965            return Task::ready(Ok(buffer));
3966        }
3967        let this = cx.weak_entity();
3968
3969        let rx = self.send_job(None, move |state, mut cx| async move {
3970            let Some(this) = this.upgrade() else {
3971                bail!("git store was dropped");
3972            };
3973            match state {
3974                RepositoryState::Local(..) => {
3975                    this.update(&mut cx, |_, cx| {
3976                        Self::open_local_commit_buffer(languages, buffer_store, cx)
3977                    })
3978                    .await
3979                }
3980                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
3981                    let request = client.request(proto::OpenCommitMessageBuffer {
3982                        project_id: project_id.0,
3983                        repository_id: id.to_proto(),
3984                    });
3985                    let response = request.await.context("requesting to open commit buffer")?;
3986                    let buffer_id = BufferId::new(response.buffer_id)?;
3987                    let buffer = buffer_store
3988                        .update(&mut cx, |buffer_store, cx| {
3989                            buffer_store.wait_for_remote_buffer(buffer_id, cx)
3990                        })
3991                        .await?;
3992                    if let Some(language_registry) = languages {
3993                        let git_commit_language =
3994                            language_registry.language_for_name("Git Commit").await?;
3995                        buffer.update(&mut cx, |buffer, cx| {
3996                            buffer.set_language(Some(git_commit_language), cx);
3997                        });
3998                    }
3999                    this.update(&mut cx, |this, _| {
4000                        this.commit_message_buffer = Some(buffer.clone());
4001                    });
4002                    Ok(buffer)
4003                }
4004            }
4005        });
4006
4007        cx.spawn(|_, _: &mut AsyncApp| async move { rx.await? })
4008    }
4009
4010    fn open_local_commit_buffer(
4011        language_registry: Option<Arc<LanguageRegistry>>,
4012        buffer_store: Entity<BufferStore>,
4013        cx: &mut Context<Self>,
4014    ) -> Task<Result<Entity<Buffer>>> {
4015        cx.spawn(async move |repository, cx| {
4016            let git_commit_language = match language_registry {
4017                Some(language_registry) => {
4018                    Some(language_registry.language_for_name("Git Commit").await?)
4019                }
4020                None => None,
4021            };
4022            let buffer = buffer_store
4023                .update(cx, |buffer_store, cx| {
4024                    buffer_store.create_buffer(git_commit_language, false, cx)
4025                })
4026                .await?;
4027
4028            repository.update(cx, |repository, _| {
4029                repository.commit_message_buffer = Some(buffer.clone());
4030            })?;
4031            Ok(buffer)
4032        })
4033    }
4034
4035    pub fn checkout_files(
4036        &mut self,
4037        commit: &str,
4038        paths: Vec<RepoPath>,
4039        cx: &mut Context<Self>,
4040    ) -> Task<Result<()>> {
4041        let commit = commit.to_string();
4042        let id = self.id;
4043
4044        self.spawn_job_with_tracking(
4045            paths.clone(),
4046            pending_op::GitStatus::Reverted,
4047            cx,
4048            async move |this, cx| {
4049                this.update(cx, |this, _cx| {
4050                    this.send_job(
4051                        Some(format!("git checkout {}", commit).into()),
4052                        move |git_repo, _| async move {
4053                            match git_repo {
4054                                RepositoryState::Local(LocalRepositoryState {
4055                                    backend,
4056                                    environment,
4057                                    ..
4058                                }) => {
4059                                    backend
4060                                        .checkout_files(commit, paths, environment.clone())
4061                                        .await
4062                                }
4063                                RepositoryState::Remote(RemoteRepositoryState {
4064                                    project_id,
4065                                    client,
4066                                }) => {
4067                                    client
4068                                        .request(proto::GitCheckoutFiles {
4069                                            project_id: project_id.0,
4070                                            repository_id: id.to_proto(),
4071                                            commit,
4072                                            paths: paths
4073                                                .into_iter()
4074                                                .map(|p| p.to_proto())
4075                                                .collect(),
4076                                        })
4077                                        .await?;
4078
4079                                    Ok(())
4080                                }
4081                            }
4082                        },
4083                    )
4084                })?
4085                .await?
4086            },
4087        )
4088    }
4089
4090    pub fn reset(
4091        &mut self,
4092        commit: String,
4093        reset_mode: ResetMode,
4094        _cx: &mut App,
4095    ) -> oneshot::Receiver<Result<()>> {
4096        let id = self.id;
4097
4098        self.send_job(None, move |git_repo, _| async move {
4099            match git_repo {
4100                RepositoryState::Local(LocalRepositoryState {
4101                    backend,
4102                    environment,
4103                    ..
4104                }) => backend.reset(commit, reset_mode, environment).await,
4105                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4106                    client
4107                        .request(proto::GitReset {
4108                            project_id: project_id.0,
4109                            repository_id: id.to_proto(),
4110                            commit,
4111                            mode: match reset_mode {
4112                                ResetMode::Soft => git_reset::ResetMode::Soft.into(),
4113                                ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
4114                            },
4115                        })
4116                        .await?;
4117
4118                    Ok(())
4119                }
4120            }
4121        })
4122    }
4123
4124    pub fn show(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDetails>> {
4125        let id = self.id;
4126        self.send_job(None, move |git_repo, _cx| async move {
4127            match git_repo {
4128                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
4129                    backend.show(commit).await
4130                }
4131                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4132                    let resp = client
4133                        .request(proto::GitShow {
4134                            project_id: project_id.0,
4135                            repository_id: id.to_proto(),
4136                            commit,
4137                        })
4138                        .await?;
4139
4140                    Ok(CommitDetails {
4141                        sha: resp.sha.into(),
4142                        message: resp.message.into(),
4143                        commit_timestamp: resp.commit_timestamp,
4144                        author_email: resp.author_email.into(),
4145                        author_name: resp.author_name.into(),
4146                    })
4147                }
4148            }
4149        })
4150    }
4151
4152    pub fn load_commit_diff(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
4153        let id = self.id;
4154        self.send_job(None, move |git_repo, cx| async move {
4155            match git_repo {
4156                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
4157                    backend.load_commit(commit, cx).await
4158                }
4159                RepositoryState::Remote(RemoteRepositoryState {
4160                    client, project_id, ..
4161                }) => {
4162                    let response = client
4163                        .request(proto::LoadCommitDiff {
4164                            project_id: project_id.0,
4165                            repository_id: id.to_proto(),
4166                            commit,
4167                        })
4168                        .await?;
4169                    Ok(CommitDiff {
4170                        files: response
4171                            .files
4172                            .into_iter()
4173                            .map(|file| {
4174                                Ok(CommitFile {
4175                                    path: RepoPath::from_proto(&file.path)?,
4176                                    old_text: file.old_text,
4177                                    new_text: file.new_text,
4178                                    is_binary: file.is_binary,
4179                                })
4180                            })
4181                            .collect::<Result<Vec<_>>>()?,
4182                    })
4183                }
4184            }
4185        })
4186    }
4187
4188    pub fn file_history(
4189        &mut self,
4190        path: RepoPath,
4191    ) -> oneshot::Receiver<Result<git::repository::FileHistory>> {
4192        self.file_history_paginated(path, 0, None)
4193    }
4194
4195    pub fn file_history_paginated(
4196        &mut self,
4197        path: RepoPath,
4198        skip: usize,
4199        limit: Option<usize>,
4200    ) -> oneshot::Receiver<Result<git::repository::FileHistory>> {
4201        let id = self.id;
4202        self.send_job(None, move |git_repo, _cx| async move {
4203            match git_repo {
4204                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
4205                    backend.file_history_paginated(path, skip, limit).await
4206                }
4207                RepositoryState::Remote(RemoteRepositoryState { client, project_id }) => {
4208                    let response = client
4209                        .request(proto::GitFileHistory {
4210                            project_id: project_id.0,
4211                            repository_id: id.to_proto(),
4212                            path: path.to_proto(),
4213                            skip: skip as u64,
4214                            limit: limit.map(|l| l as u64),
4215                        })
4216                        .await?;
4217                    Ok(git::repository::FileHistory {
4218                        entries: response
4219                            .entries
4220                            .into_iter()
4221                            .map(|entry| git::repository::FileHistoryEntry {
4222                                sha: entry.sha.into(),
4223                                subject: entry.subject.into(),
4224                                message: entry.message.into(),
4225                                commit_timestamp: entry.commit_timestamp,
4226                                author_name: entry.author_name.into(),
4227                                author_email: entry.author_email.into(),
4228                            })
4229                            .collect(),
4230                        path: RepoPath::from_proto(&response.path)?,
4231                    })
4232                }
4233            }
4234        })
4235    }
4236
4237    pub fn graph_data(
4238        &mut self,
4239        log_source: LogSource,
4240        log_order: LogOrder,
4241        range: Range<usize>,
4242        cx: &mut Context<Self>,
4243    ) -> &[Arc<InitialGraphCommitData>] {
4244        let initial_commit_data = &self
4245            .initial_graph_data
4246            .entry((log_order, log_source.clone()))
4247            .or_insert_with(|| {
4248                let state = self.repository_state.clone();
4249                let log_source = log_source.clone();
4250                (
4251                    cx.spawn(async move |repository, cx| {
4252                        let state = state.await;
4253                        match state {
4254                            Ok(RepositoryState::Local(LocalRepositoryState {
4255                                backend, ..
4256                            })) => {
4257                                Self::local_git_graph_data(
4258                                    repository, backend, log_source, log_order, cx,
4259                                )
4260                                .await
4261                            }
4262                            Ok(RepositoryState::Remote(_)) => {
4263                                Err("Git graph is not supported for collab yet".into())
4264                            }
4265                            Err(e) => Err(SharedString::from(e)),
4266                        }
4267                    }),
4268                    vec![],
4269                )
4270            })
4271            .1;
4272
4273        let max_start = initial_commit_data.len().saturating_sub(1);
4274        let max_end = initial_commit_data.len();
4275        &initial_commit_data[range.start.min(max_start)..range.end.min(max_end)]
4276    }
4277
4278    async fn local_git_graph_data(
4279        this: WeakEntity<Self>,
4280        backend: Arc<dyn GitRepository>,
4281        log_source: LogSource,
4282        log_order: LogOrder,
4283        cx: &mut AsyncApp,
4284    ) -> Result<(), SharedString> {
4285        let (request_tx, request_rx) =
4286            smol::channel::unbounded::<Vec<Arc<InitialGraphCommitData>>>();
4287
4288        let task = cx.background_executor().spawn({
4289            let log_source = log_source.clone();
4290            async move {
4291                backend
4292                    .initial_graph_data(log_source, log_order, request_tx)
4293                    .await
4294                    .map_err(|err| SharedString::from(err.to_string()))
4295            }
4296        });
4297
4298        let graph_data_key = (log_order, log_source.clone());
4299
4300        while let Ok(initial_graph_commit_data) = request_rx.recv().await {
4301            this.update(cx, |repository, cx| {
4302                let graph_data = repository
4303                    .initial_graph_data
4304                    .get_mut(&graph_data_key)
4305                    .map(|(_, graph_data)| graph_data);
4306                debug_assert!(
4307                    graph_data.is_some(),
4308                    "This task should be dropped if data doesn't exist"
4309                );
4310
4311                if let Some(graph_data) = graph_data {
4312                    graph_data.extend(initial_graph_commit_data);
4313                    cx.emit(RepositoryEvent::GitGraphCountUpdated(
4314                        graph_data_key.clone(),
4315                        graph_data.len(),
4316                    ));
4317                }
4318            })
4319            .ok();
4320        }
4321
4322        task.await?;
4323
4324        Ok(())
4325    }
4326
4327    pub fn fetch_commit_data(&mut self, sha: Oid, cx: &mut Context<Self>) -> &CommitDataState {
4328        if !self.commit_data.contains_key(&sha) {
4329            match &self.graph_commit_data_handler {
4330                GraphCommitHandlerState::Open(handler) => {
4331                    if handler.commit_data_request.try_send(sha).is_ok() {
4332                        let old_value = self.commit_data.insert(sha, CommitDataState::Loading);
4333                        debug_assert!(old_value.is_none(), "We should never overwrite commit data");
4334                    }
4335                }
4336                GraphCommitHandlerState::Closed => {
4337                    self.open_graph_commit_data_handler(cx);
4338                }
4339                GraphCommitHandlerState::Starting => {}
4340            }
4341        }
4342
4343        self.commit_data
4344            .get(&sha)
4345            .unwrap_or(&CommitDataState::Loading)
4346    }
4347
4348    fn open_graph_commit_data_handler(&mut self, cx: &mut Context<Self>) {
4349        self.graph_commit_data_handler = GraphCommitHandlerState::Starting;
4350
4351        let state = self.repository_state.clone();
4352        let (result_tx, result_rx) = smol::channel::bounded::<(Oid, GraphCommitData)>(64);
4353        let (request_tx, request_rx) = smol::channel::unbounded::<Oid>();
4354
4355        let foreground_task = cx.spawn(async move |this, cx| {
4356            while let Ok((sha, commit_data)) = result_rx.recv().await {
4357                let result = this.update(cx, |this, cx| {
4358                    let old_value = this
4359                        .commit_data
4360                        .insert(sha, CommitDataState::Loaded(Arc::new(commit_data)));
4361                    debug_assert!(
4362                        !matches!(old_value, Some(CommitDataState::Loaded(_))),
4363                        "We should never overwrite commit data"
4364                    );
4365
4366                    cx.notify();
4367                });
4368                if result.is_err() {
4369                    break;
4370                }
4371            }
4372
4373            this.update(cx, |this, _cx| {
4374                this.graph_commit_data_handler = GraphCommitHandlerState::Closed;
4375            })
4376            .ok();
4377        });
4378
4379        let request_tx_for_handler = request_tx;
4380        let background_executor = cx.background_executor().clone();
4381
4382        cx.background_spawn(async move {
4383            let backend = match state.await {
4384                Ok(RepositoryState::Local(LocalRepositoryState { backend, .. })) => backend,
4385                Ok(RepositoryState::Remote(_)) => {
4386                    log::error!("commit_data_reader not supported for remote repositories");
4387                    return;
4388                }
4389                Err(error) => {
4390                    log::error!("failed to get repository state: {error}");
4391                    return;
4392                }
4393            };
4394
4395            let reader = match backend.commit_data_reader() {
4396                Ok(reader) => reader,
4397                Err(error) => {
4398                    log::error!("failed to create commit data reader: {error:?}");
4399                    return;
4400                }
4401            };
4402
4403            loop {
4404                let timeout = background_executor.timer(std::time::Duration::from_secs(10));
4405
4406                futures::select_biased! {
4407                    sha = futures::FutureExt::fuse(request_rx.recv()) => {
4408                        let Ok(sha) = sha else {
4409                            break;
4410                        };
4411
4412                        match reader.read(sha).await {
4413                            Ok(commit_data) => {
4414                                if result_tx.send((sha, commit_data)).await.is_err() {
4415                                    break;
4416                                }
4417                            }
4418                            Err(error) => {
4419                                log::error!("failed to read commit data for {sha}: {error:?}");
4420                            }
4421                        }
4422                    }
4423                    _ = futures::FutureExt::fuse(timeout) => {
4424                        break;
4425                    }
4426                }
4427            }
4428
4429            drop(result_tx);
4430        })
4431        .detach();
4432
4433        self.graph_commit_data_handler = GraphCommitHandlerState::Open(GraphCommitDataHandler {
4434            _task: foreground_task,
4435            commit_data_request: request_tx_for_handler,
4436        });
4437    }
4438
4439    fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
4440        Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
4441    }
4442
4443    fn save_buffers<'a>(
4444        &self,
4445        entries: impl IntoIterator<Item = &'a RepoPath>,
4446        cx: &mut Context<Self>,
4447    ) -> Vec<Task<anyhow::Result<()>>> {
4448        let mut save_futures = Vec::new();
4449        if let Some(buffer_store) = self.buffer_store(cx) {
4450            buffer_store.update(cx, |buffer_store, cx| {
4451                for path in entries {
4452                    let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
4453                        continue;
4454                    };
4455                    if let Some(buffer) = buffer_store.get_by_path(&project_path)
4456                        && buffer
4457                            .read(cx)
4458                            .file()
4459                            .is_some_and(|file| file.disk_state().exists())
4460                        && buffer.read(cx).has_unsaved_edits()
4461                    {
4462                        save_futures.push(buffer_store.save_buffer(buffer, cx));
4463                    }
4464                }
4465            })
4466        }
4467        save_futures
4468    }
4469
4470    pub fn stage_entries(
4471        &mut self,
4472        entries: Vec<RepoPath>,
4473        cx: &mut Context<Self>,
4474    ) -> Task<anyhow::Result<()>> {
4475        self.stage_or_unstage_entries(true, entries, cx)
4476    }
4477
4478    pub fn unstage_entries(
4479        &mut self,
4480        entries: Vec<RepoPath>,
4481        cx: &mut Context<Self>,
4482    ) -> Task<anyhow::Result<()>> {
4483        self.stage_or_unstage_entries(false, entries, cx)
4484    }
4485
4486    fn stage_or_unstage_entries(
4487        &mut self,
4488        stage: bool,
4489        entries: Vec<RepoPath>,
4490        cx: &mut Context<Self>,
4491    ) -> Task<anyhow::Result<()>> {
4492        if entries.is_empty() {
4493            return Task::ready(Ok(()));
4494        }
4495        let Some(git_store) = self.git_store.upgrade() else {
4496            return Task::ready(Ok(()));
4497        };
4498        let id = self.id;
4499        let save_tasks = self.save_buffers(&entries, cx);
4500        let paths = entries
4501            .iter()
4502            .map(|p| p.as_unix_str())
4503            .collect::<Vec<_>>()
4504            .join(" ");
4505        let status = if stage {
4506            format!("git add {paths}")
4507        } else {
4508            format!("git reset {paths}")
4509        };
4510        let job_key = GitJobKey::WriteIndex(entries.clone());
4511
4512        self.spawn_job_with_tracking(
4513            entries.clone(),
4514            if stage {
4515                pending_op::GitStatus::Staged
4516            } else {
4517                pending_op::GitStatus::Unstaged
4518            },
4519            cx,
4520            async move |this, cx| {
4521                for save_task in save_tasks {
4522                    save_task.await?;
4523                }
4524
4525                this.update(cx, |this, cx| {
4526                    let weak_this = cx.weak_entity();
4527                    this.send_keyed_job(
4528                        Some(job_key),
4529                        Some(status.into()),
4530                        move |git_repo, mut cx| async move {
4531                            let hunk_staging_operation_counts = weak_this
4532                                .update(&mut cx, |this, cx| {
4533                                    let mut hunk_staging_operation_counts = HashMap::default();
4534                                    for path in &entries {
4535                                        let Some(project_path) =
4536                                            this.repo_path_to_project_path(path, cx)
4537                                        else {
4538                                            continue;
4539                                        };
4540                                        let Some(buffer) = git_store
4541                                            .read(cx)
4542                                            .buffer_store
4543                                            .read(cx)
4544                                            .get_by_path(&project_path)
4545                                        else {
4546                                            continue;
4547                                        };
4548                                        let Some(diff_state) = git_store
4549                                            .read(cx)
4550                                            .diffs
4551                                            .get(&buffer.read(cx).remote_id())
4552                                            .cloned()
4553                                        else {
4554                                            continue;
4555                                        };
4556                                        let Some(uncommitted_diff) =
4557                                            diff_state.read(cx).uncommitted_diff.as_ref().and_then(
4558                                                |uncommitted_diff| uncommitted_diff.upgrade(),
4559                                            )
4560                                        else {
4561                                            continue;
4562                                        };
4563                                        let buffer_snapshot = buffer.read(cx).text_snapshot();
4564                                        let file_exists = buffer
4565                                            .read(cx)
4566                                            .file()
4567                                            .is_some_and(|file| file.disk_state().exists());
4568                                        let hunk_staging_operation_count =
4569                                            diff_state.update(cx, |diff_state, cx| {
4570                                                uncommitted_diff.update(
4571                                                    cx,
4572                                                    |uncommitted_diff, cx| {
4573                                                        uncommitted_diff
4574                                                            .stage_or_unstage_all_hunks(
4575                                                                stage,
4576                                                                &buffer_snapshot,
4577                                                                file_exists,
4578                                                                cx,
4579                                                            );
4580                                                    },
4581                                                );
4582
4583                                                diff_state.hunk_staging_operation_count += 1;
4584                                                diff_state.hunk_staging_operation_count
4585                                            });
4586                                        hunk_staging_operation_counts.insert(
4587                                            diff_state.downgrade(),
4588                                            hunk_staging_operation_count,
4589                                        );
4590                                    }
4591                                    hunk_staging_operation_counts
4592                                })
4593                                .unwrap_or_default();
4594
4595                            let result = match git_repo {
4596                                RepositoryState::Local(LocalRepositoryState {
4597                                    backend,
4598                                    environment,
4599                                    ..
4600                                }) => {
4601                                    if stage {
4602                                        backend.stage_paths(entries, environment.clone()).await
4603                                    } else {
4604                                        backend.unstage_paths(entries, environment.clone()).await
4605                                    }
4606                                }
4607                                RepositoryState::Remote(RemoteRepositoryState {
4608                                    project_id,
4609                                    client,
4610                                }) => {
4611                                    if stage {
4612                                        client
4613                                            .request(proto::Stage {
4614                                                project_id: project_id.0,
4615                                                repository_id: id.to_proto(),
4616                                                paths: entries
4617                                                    .into_iter()
4618                                                    .map(|repo_path| repo_path.to_proto())
4619                                                    .collect(),
4620                                            })
4621                                            .await
4622                                            .context("sending stage request")
4623                                            .map(|_| ())
4624                                    } else {
4625                                        client
4626                                            .request(proto::Unstage {
4627                                                project_id: project_id.0,
4628                                                repository_id: id.to_proto(),
4629                                                paths: entries
4630                                                    .into_iter()
4631                                                    .map(|repo_path| repo_path.to_proto())
4632                                                    .collect(),
4633                                            })
4634                                            .await
4635                                            .context("sending unstage request")
4636                                            .map(|_| ())
4637                                    }
4638                                }
4639                            };
4640
4641                            for (diff_state, hunk_staging_operation_count) in
4642                                hunk_staging_operation_counts
4643                            {
4644                                diff_state
4645                                    .update(&mut cx, |diff_state, cx| {
4646                                        if result.is_ok() {
4647                                            diff_state.hunk_staging_operation_count_as_of_write =
4648                                                hunk_staging_operation_count;
4649                                        } else if let Some(uncommitted_diff) =
4650                                            &diff_state.uncommitted_diff
4651                                        {
4652                                            uncommitted_diff
4653                                                .update(cx, |uncommitted_diff, cx| {
4654                                                    uncommitted_diff.clear_pending_hunks(cx);
4655                                                })
4656                                                .ok();
4657                                        }
4658                                    })
4659                                    .ok();
4660                            }
4661
4662                            result
4663                        },
4664                    )
4665                })?
4666                .await?
4667            },
4668        )
4669    }
4670
4671    pub fn stage_all(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
4672        let to_stage = self
4673            .cached_status()
4674            .filter_map(|entry| {
4675                if let Some(ops) = self.pending_ops_for_path(&entry.repo_path) {
4676                    if ops.staging() || ops.staged() {
4677                        None
4678                    } else {
4679                        Some(entry.repo_path)
4680                    }
4681                } else if entry.status.staging().is_fully_staged() {
4682                    None
4683                } else {
4684                    Some(entry.repo_path)
4685                }
4686            })
4687            .collect();
4688        self.stage_or_unstage_entries(true, to_stage, cx)
4689    }
4690
4691    pub fn unstage_all(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
4692        let to_unstage = self
4693            .cached_status()
4694            .filter_map(|entry| {
4695                if let Some(ops) = self.pending_ops_for_path(&entry.repo_path) {
4696                    if !ops.staging() && !ops.staged() {
4697                        None
4698                    } else {
4699                        Some(entry.repo_path)
4700                    }
4701                } else if entry.status.staging().is_fully_unstaged() {
4702                    None
4703                } else {
4704                    Some(entry.repo_path)
4705                }
4706            })
4707            .collect();
4708        self.stage_or_unstage_entries(false, to_unstage, cx)
4709    }
4710
4711    pub fn stash_all(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
4712        let to_stash = self.cached_status().map(|entry| entry.repo_path).collect();
4713
4714        self.stash_entries(to_stash, cx)
4715    }
4716
4717    pub fn stash_entries(
4718        &mut self,
4719        entries: Vec<RepoPath>,
4720        cx: &mut Context<Self>,
4721    ) -> Task<anyhow::Result<()>> {
4722        let id = self.id;
4723
4724        cx.spawn(async move |this, cx| {
4725            this.update(cx, |this, _| {
4726                this.send_job(None, move |git_repo, _cx| async move {
4727                    match git_repo {
4728                        RepositoryState::Local(LocalRepositoryState {
4729                            backend,
4730                            environment,
4731                            ..
4732                        }) => backend.stash_paths(entries, environment).await,
4733                        RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4734                            client
4735                                .request(proto::Stash {
4736                                    project_id: project_id.0,
4737                                    repository_id: id.to_proto(),
4738                                    paths: entries
4739                                        .into_iter()
4740                                        .map(|repo_path| repo_path.to_proto())
4741                                        .collect(),
4742                                })
4743                                .await
4744                                .context("sending stash request")?;
4745                            Ok(())
4746                        }
4747                    }
4748                })
4749            })?
4750            .await??;
4751            Ok(())
4752        })
4753    }
4754
4755    pub fn stash_pop(
4756        &mut self,
4757        index: Option<usize>,
4758        cx: &mut Context<Self>,
4759    ) -> Task<anyhow::Result<()>> {
4760        let id = self.id;
4761        cx.spawn(async move |this, cx| {
4762            this.update(cx, |this, _| {
4763                this.send_job(None, move |git_repo, _cx| async move {
4764                    match git_repo {
4765                        RepositoryState::Local(LocalRepositoryState {
4766                            backend,
4767                            environment,
4768                            ..
4769                        }) => backend.stash_pop(index, environment).await,
4770                        RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4771                            client
4772                                .request(proto::StashPop {
4773                                    project_id: project_id.0,
4774                                    repository_id: id.to_proto(),
4775                                    stash_index: index.map(|i| i as u64),
4776                                })
4777                                .await
4778                                .context("sending stash pop request")?;
4779                            Ok(())
4780                        }
4781                    }
4782                })
4783            })?
4784            .await??;
4785            Ok(())
4786        })
4787    }
4788
4789    pub fn stash_apply(
4790        &mut self,
4791        index: Option<usize>,
4792        cx: &mut Context<Self>,
4793    ) -> Task<anyhow::Result<()>> {
4794        let id = self.id;
4795        cx.spawn(async move |this, cx| {
4796            this.update(cx, |this, _| {
4797                this.send_job(None, move |git_repo, _cx| async move {
4798                    match git_repo {
4799                        RepositoryState::Local(LocalRepositoryState {
4800                            backend,
4801                            environment,
4802                            ..
4803                        }) => backend.stash_apply(index, environment).await,
4804                        RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4805                            client
4806                                .request(proto::StashApply {
4807                                    project_id: project_id.0,
4808                                    repository_id: id.to_proto(),
4809                                    stash_index: index.map(|i| i as u64),
4810                                })
4811                                .await
4812                                .context("sending stash apply request")?;
4813                            Ok(())
4814                        }
4815                    }
4816                })
4817            })?
4818            .await??;
4819            Ok(())
4820        })
4821    }
4822
4823    pub fn stash_drop(
4824        &mut self,
4825        index: Option<usize>,
4826        cx: &mut Context<Self>,
4827    ) -> oneshot::Receiver<anyhow::Result<()>> {
4828        let id = self.id;
4829        let updates_tx = self
4830            .git_store()
4831            .and_then(|git_store| match &git_store.read(cx).state {
4832                GitStoreState::Local { downstream, .. } => downstream
4833                    .as_ref()
4834                    .map(|downstream| downstream.updates_tx.clone()),
4835                _ => None,
4836            });
4837        let this = cx.weak_entity();
4838        self.send_job(None, move |git_repo, mut cx| async move {
4839            match git_repo {
4840                RepositoryState::Local(LocalRepositoryState {
4841                    backend,
4842                    environment,
4843                    ..
4844                }) => {
4845                    // TODO would be nice to not have to do this manually
4846                    let result = backend.stash_drop(index, environment).await;
4847                    if result.is_ok()
4848                        && let Ok(stash_entries) = backend.stash_entries().await
4849                    {
4850                        let snapshot = this.update(&mut cx, |this, cx| {
4851                            this.snapshot.stash_entries = stash_entries;
4852                            cx.emit(RepositoryEvent::StashEntriesChanged);
4853                            this.snapshot.clone()
4854                        })?;
4855                        if let Some(updates_tx) = updates_tx {
4856                            updates_tx
4857                                .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
4858                                .ok();
4859                        }
4860                    }
4861
4862                    result
4863                }
4864                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4865                    client
4866                        .request(proto::StashDrop {
4867                            project_id: project_id.0,
4868                            repository_id: id.to_proto(),
4869                            stash_index: index.map(|i| i as u64),
4870                        })
4871                        .await
4872                        .context("sending stash pop request")?;
4873                    Ok(())
4874                }
4875            }
4876        })
4877    }
4878
4879    pub fn run_hook(&mut self, hook: RunHook, _cx: &mut App) -> oneshot::Receiver<Result<()>> {
4880        let id = self.id;
4881        self.send_job(
4882            Some(format!("git hook {}", hook.as_str()).into()),
4883            move |git_repo, _cx| async move {
4884                match git_repo {
4885                    RepositoryState::Local(LocalRepositoryState {
4886                        backend,
4887                        environment,
4888                        ..
4889                    }) => backend.run_hook(hook, environment.clone()).await,
4890                    RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4891                        client
4892                            .request(proto::RunGitHook {
4893                                project_id: project_id.0,
4894                                repository_id: id.to_proto(),
4895                                hook: hook.to_proto(),
4896                            })
4897                            .await?;
4898
4899                        Ok(())
4900                    }
4901                }
4902            },
4903        )
4904    }
4905
4906    pub fn commit(
4907        &mut self,
4908        message: SharedString,
4909        name_and_email: Option<(SharedString, SharedString)>,
4910        options: CommitOptions,
4911        askpass: AskPassDelegate,
4912        cx: &mut App,
4913    ) -> oneshot::Receiver<Result<()>> {
4914        let id = self.id;
4915        let askpass_delegates = self.askpass_delegates.clone();
4916        let askpass_id = util::post_inc(&mut self.latest_askpass_id);
4917
4918        let rx = self.run_hook(RunHook::PreCommit, cx);
4919
4920        self.send_job(Some("git commit".into()), move |git_repo, _cx| async move {
4921            rx.await??;
4922
4923            match git_repo {
4924                RepositoryState::Local(LocalRepositoryState {
4925                    backend,
4926                    environment,
4927                    ..
4928                }) => {
4929                    backend
4930                        .commit(message, name_and_email, options, askpass, environment)
4931                        .await
4932                }
4933                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4934                    askpass_delegates.lock().insert(askpass_id, askpass);
4935                    let _defer = util::defer(|| {
4936                        let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
4937                        debug_assert!(askpass_delegate.is_some());
4938                    });
4939                    let (name, email) = name_and_email.unzip();
4940                    client
4941                        .request(proto::Commit {
4942                            project_id: project_id.0,
4943                            repository_id: id.to_proto(),
4944                            message: String::from(message),
4945                            name: name.map(String::from),
4946                            email: email.map(String::from),
4947                            options: Some(proto::commit::CommitOptions {
4948                                amend: options.amend,
4949                                signoff: options.signoff,
4950                            }),
4951                            askpass_id,
4952                        })
4953                        .await
4954                        .context("sending commit request")?;
4955
4956                    Ok(())
4957                }
4958            }
4959        })
4960    }
4961
4962    pub fn fetch(
4963        &mut self,
4964        fetch_options: FetchOptions,
4965        askpass: AskPassDelegate,
4966        _cx: &mut App,
4967    ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
4968        let askpass_delegates = self.askpass_delegates.clone();
4969        let askpass_id = util::post_inc(&mut self.latest_askpass_id);
4970        let id = self.id;
4971
4972        self.send_job(Some("git fetch".into()), move |git_repo, cx| async move {
4973            match git_repo {
4974                RepositoryState::Local(LocalRepositoryState {
4975                    backend,
4976                    environment,
4977                    ..
4978                }) => backend.fetch(fetch_options, askpass, environment, cx).await,
4979                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4980                    askpass_delegates.lock().insert(askpass_id, askpass);
4981                    let _defer = util::defer(|| {
4982                        let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
4983                        debug_assert!(askpass_delegate.is_some());
4984                    });
4985
4986                    let response = client
4987                        .request(proto::Fetch {
4988                            project_id: project_id.0,
4989                            repository_id: id.to_proto(),
4990                            askpass_id,
4991                            remote: fetch_options.to_proto(),
4992                        })
4993                        .await
4994                        .context("sending fetch request")?;
4995
4996                    Ok(RemoteCommandOutput {
4997                        stdout: response.stdout,
4998                        stderr: response.stderr,
4999                    })
5000                }
5001            }
5002        })
5003    }
5004
5005    pub fn push(
5006        &mut self,
5007        branch: SharedString,
5008        remote_branch: SharedString,
5009        remote: SharedString,
5010        options: Option<PushOptions>,
5011        askpass: AskPassDelegate,
5012        cx: &mut Context<Self>,
5013    ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
5014        let askpass_delegates = self.askpass_delegates.clone();
5015        let askpass_id = util::post_inc(&mut self.latest_askpass_id);
5016        let id = self.id;
5017
5018        let args = options
5019            .map(|option| match option {
5020                PushOptions::SetUpstream => " --set-upstream",
5021                PushOptions::Force => " --force-with-lease",
5022            })
5023            .unwrap_or("");
5024
5025        let updates_tx = self
5026            .git_store()
5027            .and_then(|git_store| match &git_store.read(cx).state {
5028                GitStoreState::Local { downstream, .. } => downstream
5029                    .as_ref()
5030                    .map(|downstream| downstream.updates_tx.clone()),
5031                _ => None,
5032            });
5033
5034        let this = cx.weak_entity();
5035        self.send_job(
5036            Some(format!("git push {} {} {}:{}", args, remote, branch, remote_branch).into()),
5037            move |git_repo, mut cx| async move {
5038                match git_repo {
5039                    RepositoryState::Local(LocalRepositoryState {
5040                        backend,
5041                        environment,
5042                        ..
5043                    }) => {
5044                        let result = backend
5045                            .push(
5046                                branch.to_string(),
5047                                remote_branch.to_string(),
5048                                remote.to_string(),
5049                                options,
5050                                askpass,
5051                                environment.clone(),
5052                                cx.clone(),
5053                            )
5054                            .await;
5055                        // TODO would be nice to not have to do this manually
5056                        if result.is_ok() {
5057                            let branches = backend.branches().await?;
5058                            let branch = branches.into_iter().find(|branch| branch.is_head);
5059                            log::info!("head branch after scan is {branch:?}");
5060                            let snapshot = this.update(&mut cx, |this, cx| {
5061                                this.snapshot.branch = branch;
5062                                cx.emit(RepositoryEvent::BranchChanged);
5063                                this.snapshot.clone()
5064                            })?;
5065                            if let Some(updates_tx) = updates_tx {
5066                                updates_tx
5067                                    .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
5068                                    .ok();
5069                            }
5070                        }
5071                        result
5072                    }
5073                    RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5074                        askpass_delegates.lock().insert(askpass_id, askpass);
5075                        let _defer = util::defer(|| {
5076                            let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
5077                            debug_assert!(askpass_delegate.is_some());
5078                        });
5079                        let response = client
5080                            .request(proto::Push {
5081                                project_id: project_id.0,
5082                                repository_id: id.to_proto(),
5083                                askpass_id,
5084                                branch_name: branch.to_string(),
5085                                remote_branch_name: remote_branch.to_string(),
5086                                remote_name: remote.to_string(),
5087                                options: options.map(|options| match options {
5088                                    PushOptions::Force => proto::push::PushOptions::Force,
5089                                    PushOptions::SetUpstream => {
5090                                        proto::push::PushOptions::SetUpstream
5091                                    }
5092                                }
5093                                    as i32),
5094                            })
5095                            .await
5096                            .context("sending push request")?;
5097
5098                        Ok(RemoteCommandOutput {
5099                            stdout: response.stdout,
5100                            stderr: response.stderr,
5101                        })
5102                    }
5103                }
5104            },
5105        )
5106    }
5107
5108    pub fn pull(
5109        &mut self,
5110        branch: Option<SharedString>,
5111        remote: SharedString,
5112        rebase: bool,
5113        askpass: AskPassDelegate,
5114        _cx: &mut App,
5115    ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
5116        let askpass_delegates = self.askpass_delegates.clone();
5117        let askpass_id = util::post_inc(&mut self.latest_askpass_id);
5118        let id = self.id;
5119
5120        let mut status = "git pull".to_string();
5121        if rebase {
5122            status.push_str(" --rebase");
5123        }
5124        status.push_str(&format!(" {}", remote));
5125        if let Some(b) = &branch {
5126            status.push_str(&format!(" {}", b));
5127        }
5128
5129        self.send_job(Some(status.into()), move |git_repo, cx| async move {
5130            match git_repo {
5131                RepositoryState::Local(LocalRepositoryState {
5132                    backend,
5133                    environment,
5134                    ..
5135                }) => {
5136                    backend
5137                        .pull(
5138                            branch.as_ref().map(|b| b.to_string()),
5139                            remote.to_string(),
5140                            rebase,
5141                            askpass,
5142                            environment.clone(),
5143                            cx,
5144                        )
5145                        .await
5146                }
5147                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5148                    askpass_delegates.lock().insert(askpass_id, askpass);
5149                    let _defer = util::defer(|| {
5150                        let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
5151                        debug_assert!(askpass_delegate.is_some());
5152                    });
5153                    let response = client
5154                        .request(proto::Pull {
5155                            project_id: project_id.0,
5156                            repository_id: id.to_proto(),
5157                            askpass_id,
5158                            rebase,
5159                            branch_name: branch.as_ref().map(|b| b.to_string()),
5160                            remote_name: remote.to_string(),
5161                        })
5162                        .await
5163                        .context("sending pull request")?;
5164
5165                    Ok(RemoteCommandOutput {
5166                        stdout: response.stdout,
5167                        stderr: response.stderr,
5168                    })
5169                }
5170            }
5171        })
5172    }
5173
5174    fn spawn_set_index_text_job(
5175        &mut self,
5176        path: RepoPath,
5177        content: Option<String>,
5178        hunk_staging_operation_count: Option<usize>,
5179        cx: &mut Context<Self>,
5180    ) -> oneshot::Receiver<anyhow::Result<()>> {
5181        let id = self.id;
5182        let this = cx.weak_entity();
5183        let git_store = self.git_store.clone();
5184        let abs_path = self.snapshot.repo_path_to_abs_path(&path);
5185        self.send_keyed_job(
5186            Some(GitJobKey::WriteIndex(vec![path.clone()])),
5187            None,
5188            move |git_repo, mut cx| async move {
5189                log::debug!(
5190                    "start updating index text for buffer {}",
5191                    path.as_unix_str()
5192                );
5193
5194                match git_repo {
5195                    RepositoryState::Local(LocalRepositoryState {
5196                        fs,
5197                        backend,
5198                        environment,
5199                        ..
5200                    }) => {
5201                        let executable = match fs.metadata(&abs_path).await {
5202                            Ok(Some(meta)) => meta.is_executable,
5203                            Ok(None) => false,
5204                            Err(_err) => false,
5205                        };
5206                        backend
5207                            .set_index_text(path.clone(), content, environment.clone(), executable)
5208                            .await?;
5209                    }
5210                    RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5211                        client
5212                            .request(proto::SetIndexText {
5213                                project_id: project_id.0,
5214                                repository_id: id.to_proto(),
5215                                path: path.to_proto(),
5216                                text: content,
5217                            })
5218                            .await?;
5219                    }
5220                }
5221                log::debug!(
5222                    "finish updating index text for buffer {}",
5223                    path.as_unix_str()
5224                );
5225
5226                if let Some(hunk_staging_operation_count) = hunk_staging_operation_count {
5227                    let project_path = this
5228                        .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
5229                        .ok()
5230                        .flatten();
5231                    git_store
5232                        .update(&mut cx, |git_store, cx| {
5233                            let buffer_id = git_store
5234                                .buffer_store
5235                                .read(cx)
5236                                .get_by_path(&project_path?)?
5237                                .read(cx)
5238                                .remote_id();
5239                            let diff_state = git_store.diffs.get(&buffer_id)?;
5240                            diff_state.update(cx, |diff_state, _| {
5241                                diff_state.hunk_staging_operation_count_as_of_write =
5242                                    hunk_staging_operation_count;
5243                            });
5244                            Some(())
5245                        })
5246                        .context("Git store dropped")?;
5247                }
5248                Ok(())
5249            },
5250        )
5251    }
5252
5253    pub fn create_remote(
5254        &mut self,
5255        remote_name: String,
5256        remote_url: String,
5257    ) -> oneshot::Receiver<Result<()>> {
5258        let id = self.id;
5259        self.send_job(
5260            Some(format!("git remote add {remote_name} {remote_url}").into()),
5261            move |repo, _cx| async move {
5262                match repo {
5263                    RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5264                        backend.create_remote(remote_name, remote_url).await
5265                    }
5266                    RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5267                        client
5268                            .request(proto::GitCreateRemote {
5269                                project_id: project_id.0,
5270                                repository_id: id.to_proto(),
5271                                remote_name,
5272                                remote_url,
5273                            })
5274                            .await?;
5275
5276                        Ok(())
5277                    }
5278                }
5279            },
5280        )
5281    }
5282
5283    pub fn remove_remote(&mut self, remote_name: String) -> oneshot::Receiver<Result<()>> {
5284        let id = self.id;
5285        self.send_job(
5286            Some(format!("git remove remote {remote_name}").into()),
5287            move |repo, _cx| async move {
5288                match repo {
5289                    RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5290                        backend.remove_remote(remote_name).await
5291                    }
5292                    RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5293                        client
5294                            .request(proto::GitRemoveRemote {
5295                                project_id: project_id.0,
5296                                repository_id: id.to_proto(),
5297                                remote_name,
5298                            })
5299                            .await?;
5300
5301                        Ok(())
5302                    }
5303                }
5304            },
5305        )
5306    }
5307
5308    pub fn get_remotes(
5309        &mut self,
5310        branch_name: Option<String>,
5311        is_push: bool,
5312    ) -> oneshot::Receiver<Result<Vec<Remote>>> {
5313        let id = self.id;
5314        self.send_job(None, move |repo, _cx| async move {
5315            match repo {
5316                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5317                    let remote = if let Some(branch_name) = branch_name {
5318                        if is_push {
5319                            backend.get_push_remote(branch_name).await?
5320                        } else {
5321                            backend.get_branch_remote(branch_name).await?
5322                        }
5323                    } else {
5324                        None
5325                    };
5326
5327                    match remote {
5328                        Some(remote) => Ok(vec![remote]),
5329                        None => backend.get_all_remotes().await,
5330                    }
5331                }
5332                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5333                    let response = client
5334                        .request(proto::GetRemotes {
5335                            project_id: project_id.0,
5336                            repository_id: id.to_proto(),
5337                            branch_name,
5338                            is_push,
5339                        })
5340                        .await?;
5341
5342                    let remotes = response
5343                        .remotes
5344                        .into_iter()
5345                        .map(|remotes| Remote {
5346                            name: remotes.name.into(),
5347                        })
5348                        .collect();
5349
5350                    Ok(remotes)
5351                }
5352            }
5353        })
5354    }
5355
5356    pub fn branches(&mut self) -> oneshot::Receiver<Result<Vec<Branch>>> {
5357        let id = self.id;
5358        self.send_job(None, move |repo, _| async move {
5359            match repo {
5360                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5361                    backend.branches().await
5362                }
5363                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5364                    let response = client
5365                        .request(proto::GitGetBranches {
5366                            project_id: project_id.0,
5367                            repository_id: id.to_proto(),
5368                        })
5369                        .await?;
5370
5371                    let branches = response
5372                        .branches
5373                        .into_iter()
5374                        .map(|branch| proto_to_branch(&branch))
5375                        .collect();
5376
5377                    Ok(branches)
5378                }
5379            }
5380        })
5381    }
5382
5383    pub fn worktrees(&mut self) -> oneshot::Receiver<Result<Vec<GitWorktree>>> {
5384        let id = self.id;
5385        self.send_job(None, move |repo, _| async move {
5386            match repo {
5387                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5388                    backend.worktrees().await
5389                }
5390                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5391                    let response = client
5392                        .request(proto::GitGetWorktrees {
5393                            project_id: project_id.0,
5394                            repository_id: id.to_proto(),
5395                        })
5396                        .await?;
5397
5398                    let worktrees = response
5399                        .worktrees
5400                        .into_iter()
5401                        .map(|worktree| proto_to_worktree(&worktree))
5402                        .collect();
5403
5404                    Ok(worktrees)
5405                }
5406            }
5407        })
5408    }
5409
5410    pub fn create_worktree(
5411        &mut self,
5412        name: String,
5413        path: PathBuf,
5414        commit: Option<String>,
5415    ) -> oneshot::Receiver<Result<()>> {
5416        let id = self.id;
5417        self.send_job(
5418            Some("git worktree add".into()),
5419            move |repo, _cx| async move {
5420                match repo {
5421                    RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5422                        backend.create_worktree(name, path, commit).await
5423                    }
5424                    RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5425                        client
5426                            .request(proto::GitCreateWorktree {
5427                                project_id: project_id.0,
5428                                repository_id: id.to_proto(),
5429                                name,
5430                                directory: path.to_string_lossy().to_string(),
5431                                commit,
5432                            })
5433                            .await?;
5434
5435                        Ok(())
5436                    }
5437                }
5438            },
5439        )
5440    }
5441
5442    pub fn default_branch(
5443        &mut self,
5444        include_remote_name: bool,
5445    ) -> oneshot::Receiver<Result<Option<SharedString>>> {
5446        let id = self.id;
5447        self.send_job(None, move |repo, _| async move {
5448            match repo {
5449                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5450                    backend.default_branch(include_remote_name).await
5451                }
5452                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5453                    let response = client
5454                        .request(proto::GetDefaultBranch {
5455                            project_id: project_id.0,
5456                            repository_id: id.to_proto(),
5457                        })
5458                        .await?;
5459
5460                    anyhow::Ok(response.branch.map(SharedString::from))
5461                }
5462            }
5463        })
5464    }
5465
5466    pub fn diff_tree(
5467        &mut self,
5468        diff_type: DiffTreeType,
5469        _cx: &App,
5470    ) -> oneshot::Receiver<Result<TreeDiff>> {
5471        let repository_id = self.snapshot.id;
5472        self.send_job(None, move |repo, _cx| async move {
5473            match repo {
5474                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5475                    backend.diff_tree(diff_type).await
5476                }
5477                RepositoryState::Remote(RemoteRepositoryState { client, project_id }) => {
5478                    let response = client
5479                        .request(proto::GetTreeDiff {
5480                            project_id: project_id.0,
5481                            repository_id: repository_id.0,
5482                            is_merge: matches!(diff_type, DiffTreeType::MergeBase { .. }),
5483                            base: diff_type.base().to_string(),
5484                            head: diff_type.head().to_string(),
5485                        })
5486                        .await?;
5487
5488                    let entries = response
5489                        .entries
5490                        .into_iter()
5491                        .filter_map(|entry| {
5492                            let status = match entry.status() {
5493                                proto::tree_diff_status::Status::Added => TreeDiffStatus::Added,
5494                                proto::tree_diff_status::Status::Modified => {
5495                                    TreeDiffStatus::Modified {
5496                                        old: git::Oid::from_str(
5497                                            &entry.oid.context("missing oid").log_err()?,
5498                                        )
5499                                        .log_err()?,
5500                                    }
5501                                }
5502                                proto::tree_diff_status::Status::Deleted => {
5503                                    TreeDiffStatus::Deleted {
5504                                        old: git::Oid::from_str(
5505                                            &entry.oid.context("missing oid").log_err()?,
5506                                        )
5507                                        .log_err()?,
5508                                    }
5509                                }
5510                            };
5511                            Some((
5512                                RepoPath::from_rel_path(
5513                                    &RelPath::from_proto(&entry.path).log_err()?,
5514                                ),
5515                                status,
5516                            ))
5517                        })
5518                        .collect();
5519
5520                    Ok(TreeDiff { entries })
5521                }
5522            }
5523        })
5524    }
5525
5526    pub fn diff(&mut self, diff_type: DiffType, _cx: &App) -> oneshot::Receiver<Result<String>> {
5527        let id = self.id;
5528        self.send_job(None, move |repo, _cx| async move {
5529            match repo {
5530                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5531                    backend.diff(diff_type).await
5532                }
5533                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5534                    let response = client
5535                        .request(proto::GitDiff {
5536                            project_id: project_id.0,
5537                            repository_id: id.to_proto(),
5538                            diff_type: match diff_type {
5539                                DiffType::HeadToIndex => {
5540                                    proto::git_diff::DiffType::HeadToIndex.into()
5541                                }
5542                                DiffType::HeadToWorktree => {
5543                                    proto::git_diff::DiffType::HeadToWorktree.into()
5544                                }
5545                            },
5546                        })
5547                        .await?;
5548
5549                    Ok(response.diff)
5550                }
5551            }
5552        })
5553    }
5554
5555    pub fn create_branch(
5556        &mut self,
5557        branch_name: String,
5558        base_branch: Option<String>,
5559    ) -> oneshot::Receiver<Result<()>> {
5560        let id = self.id;
5561        let status_msg = if let Some(ref base) = base_branch {
5562            format!("git switch -c {branch_name} {base}").into()
5563        } else {
5564            format!("git switch -c {branch_name}").into()
5565        };
5566        self.send_job(Some(status_msg), move |repo, _cx| async move {
5567            match repo {
5568                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5569                    backend.create_branch(branch_name, base_branch).await
5570                }
5571                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5572                    client
5573                        .request(proto::GitCreateBranch {
5574                            project_id: project_id.0,
5575                            repository_id: id.to_proto(),
5576                            branch_name,
5577                        })
5578                        .await?;
5579
5580                    Ok(())
5581                }
5582            }
5583        })
5584    }
5585
5586    pub fn change_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
5587        let id = self.id;
5588        self.send_job(
5589            Some(format!("git switch {branch_name}").into()),
5590            move |repo, _cx| async move {
5591                match repo {
5592                    RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5593                        backend.change_branch(branch_name).await
5594                    }
5595                    RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5596                        client
5597                            .request(proto::GitChangeBranch {
5598                                project_id: project_id.0,
5599                                repository_id: id.to_proto(),
5600                                branch_name,
5601                            })
5602                            .await?;
5603
5604                        Ok(())
5605                    }
5606                }
5607            },
5608        )
5609    }
5610
5611    pub fn delete_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
5612        let id = self.id;
5613        self.send_job(
5614            Some(format!("git branch -d {branch_name}").into()),
5615            move |repo, _cx| async move {
5616                match repo {
5617                    RepositoryState::Local(state) => state.backend.delete_branch(branch_name).await,
5618                    RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5619                        client
5620                            .request(proto::GitDeleteBranch {
5621                                project_id: project_id.0,
5622                                repository_id: id.to_proto(),
5623                                branch_name,
5624                            })
5625                            .await?;
5626
5627                        Ok(())
5628                    }
5629                }
5630            },
5631        )
5632    }
5633
5634    pub fn rename_branch(
5635        &mut self,
5636        branch: String,
5637        new_name: String,
5638    ) -> oneshot::Receiver<Result<()>> {
5639        let id = self.id;
5640        self.send_job(
5641            Some(format!("git branch -m {branch} {new_name}").into()),
5642            move |repo, _cx| async move {
5643                match repo {
5644                    RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5645                        backend.rename_branch(branch, new_name).await
5646                    }
5647                    RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5648                        client
5649                            .request(proto::GitRenameBranch {
5650                                project_id: project_id.0,
5651                                repository_id: id.to_proto(),
5652                                branch,
5653                                new_name,
5654                            })
5655                            .await?;
5656
5657                        Ok(())
5658                    }
5659                }
5660            },
5661        )
5662    }
5663
5664    pub fn check_for_pushed_commits(&mut self) -> oneshot::Receiver<Result<Vec<SharedString>>> {
5665        let id = self.id;
5666        self.send_job(None, move |repo, _cx| async move {
5667            match repo {
5668                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5669                    backend.check_for_pushed_commit().await
5670                }
5671                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5672                    let response = client
5673                        .request(proto::CheckForPushedCommits {
5674                            project_id: project_id.0,
5675                            repository_id: id.to_proto(),
5676                        })
5677                        .await?;
5678
5679                    let branches = response.pushed_to.into_iter().map(Into::into).collect();
5680
5681                    Ok(branches)
5682                }
5683            }
5684        })
5685    }
5686
5687    pub fn checkpoint(&mut self) -> oneshot::Receiver<Result<GitRepositoryCheckpoint>> {
5688        self.send_job(None, |repo, _cx| async move {
5689            match repo {
5690                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5691                    backend.checkpoint().await
5692                }
5693                RepositoryState::Remote(..) => anyhow::bail!("not implemented yet"),
5694            }
5695        })
5696    }
5697
5698    pub fn restore_checkpoint(
5699        &mut self,
5700        checkpoint: GitRepositoryCheckpoint,
5701    ) -> oneshot::Receiver<Result<()>> {
5702        self.send_job(None, move |repo, _cx| async move {
5703            match repo {
5704                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5705                    backend.restore_checkpoint(checkpoint).await
5706                }
5707                RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
5708            }
5709        })
5710    }
5711
5712    pub(crate) fn apply_remote_update(
5713        &mut self,
5714        update: proto::UpdateRepository,
5715        cx: &mut Context<Self>,
5716    ) -> Result<()> {
5717        let conflicted_paths = TreeSet::from_ordered_entries(
5718            update
5719                .current_merge_conflicts
5720                .into_iter()
5721                .filter_map(|path| RepoPath::from_proto(&path).log_err()),
5722        );
5723        let new_branch = update.branch_summary.as_ref().map(proto_to_branch);
5724        let new_head_commit = update
5725            .head_commit_details
5726            .as_ref()
5727            .map(proto_to_commit_details);
5728        if self.snapshot.branch != new_branch || self.snapshot.head_commit != new_head_commit {
5729            cx.emit(RepositoryEvent::BranchChanged)
5730        }
5731        self.snapshot.branch = new_branch;
5732        self.snapshot.head_commit = new_head_commit;
5733
5734        self.snapshot.merge.conflicted_paths = conflicted_paths;
5735        self.snapshot.merge.message = update.merge_message.map(SharedString::from);
5736        let new_stash_entries = GitStash {
5737            entries: update
5738                .stash_entries
5739                .iter()
5740                .filter_map(|entry| proto_to_stash(entry).ok())
5741                .collect(),
5742        };
5743        if self.snapshot.stash_entries != new_stash_entries {
5744            cx.emit(RepositoryEvent::StashEntriesChanged)
5745        }
5746        self.snapshot.stash_entries = new_stash_entries;
5747        self.snapshot.remote_upstream_url = update.remote_upstream_url;
5748        self.snapshot.remote_origin_url = update.remote_origin_url;
5749
5750        let edits = update
5751            .removed_statuses
5752            .into_iter()
5753            .filter_map(|path| {
5754                Some(sum_tree::Edit::Remove(PathKey(
5755                    RelPath::from_proto(&path).log_err()?,
5756                )))
5757            })
5758            .chain(
5759                update
5760                    .updated_statuses
5761                    .into_iter()
5762                    .filter_map(|updated_status| {
5763                        Some(sum_tree::Edit::Insert(updated_status.try_into().log_err()?))
5764                    }),
5765            )
5766            .collect::<Vec<_>>();
5767        if !edits.is_empty() {
5768            cx.emit(RepositoryEvent::StatusesChanged);
5769        }
5770        self.snapshot.statuses_by_path.edit(edits, ());
5771        if update.is_last_update {
5772            self.snapshot.scan_id = update.scan_id;
5773        }
5774        self.clear_pending_ops(cx);
5775        Ok(())
5776    }
5777
5778    pub fn compare_checkpoints(
5779        &mut self,
5780        left: GitRepositoryCheckpoint,
5781        right: GitRepositoryCheckpoint,
5782    ) -> oneshot::Receiver<Result<bool>> {
5783        self.send_job(None, move |repo, _cx| async move {
5784            match repo {
5785                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5786                    backend.compare_checkpoints(left, right).await
5787                }
5788                RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
5789            }
5790        })
5791    }
5792
5793    pub fn diff_checkpoints(
5794        &mut self,
5795        base_checkpoint: GitRepositoryCheckpoint,
5796        target_checkpoint: GitRepositoryCheckpoint,
5797    ) -> oneshot::Receiver<Result<String>> {
5798        self.send_job(None, move |repo, _cx| async move {
5799            match repo {
5800                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5801                    backend
5802                        .diff_checkpoints(base_checkpoint, target_checkpoint)
5803                        .await
5804                }
5805                RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
5806            }
5807        })
5808    }
5809
5810    fn clear_pending_ops(&mut self, cx: &mut Context<Self>) {
5811        let updated = SumTree::from_iter(
5812            self.pending_ops.iter().filter_map(|ops| {
5813                let inner_ops: Vec<PendingOp> =
5814                    ops.ops.iter().filter(|op| op.running()).cloned().collect();
5815                if inner_ops.is_empty() {
5816                    None
5817                } else {
5818                    Some(PendingOps {
5819                        repo_path: ops.repo_path.clone(),
5820                        ops: inner_ops,
5821                    })
5822                }
5823            }),
5824            (),
5825        );
5826
5827        if updated != self.pending_ops {
5828            cx.emit(RepositoryEvent::PendingOpsChanged {
5829                pending_ops: self.pending_ops.clone(),
5830            })
5831        }
5832
5833        self.pending_ops = updated;
5834    }
5835
5836    fn schedule_scan(
5837        &mut self,
5838        updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
5839        cx: &mut Context<Self>,
5840    ) {
5841        let this = cx.weak_entity();
5842        let _ = self.send_keyed_job(
5843            Some(GitJobKey::ReloadGitState),
5844            None,
5845            |state, mut cx| async move {
5846                log::debug!("run scheduled git status scan");
5847
5848                let Some(this) = this.upgrade() else {
5849                    return Ok(());
5850                };
5851                let RepositoryState::Local(LocalRepositoryState { backend, .. }) = state else {
5852                    bail!("not a local repository")
5853                };
5854                let (snapshot, events) = this
5855                    .update(&mut cx, |this, _| {
5856                        this.paths_needing_status_update.clear();
5857                        compute_snapshot(
5858                            this.id,
5859                            this.work_directory_abs_path.clone(),
5860                            this.snapshot.clone(),
5861                            backend.clone(),
5862                        )
5863                    })
5864                    .await?;
5865                this.update(&mut cx, |this, cx| {
5866                    this.snapshot = snapshot.clone();
5867                    this.clear_pending_ops(cx);
5868                    for event in events {
5869                        cx.emit(event);
5870                    }
5871                });
5872                if let Some(updates_tx) = updates_tx {
5873                    updates_tx
5874                        .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
5875                        .ok();
5876                }
5877                Ok(())
5878            },
5879        );
5880    }
5881
5882    fn spawn_local_git_worker(
5883        state: Shared<Task<Result<LocalRepositoryState, String>>>,
5884        cx: &mut Context<Self>,
5885    ) -> mpsc::UnboundedSender<GitJob> {
5886        let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
5887
5888        cx.spawn(async move |_, cx| {
5889            let state = state.await.map_err(|err| anyhow::anyhow!(err))?;
5890            if let Some(git_hosting_provider_registry) =
5891                cx.update(|cx| GitHostingProviderRegistry::try_global(cx))
5892            {
5893                git_hosting_providers::register_additional_providers(
5894                    git_hosting_provider_registry,
5895                    state.backend.clone(),
5896                )
5897                .await;
5898            }
5899            let state = RepositoryState::Local(state);
5900            let mut jobs = VecDeque::new();
5901            loop {
5902                while let Ok(Some(next_job)) = job_rx.try_next() {
5903                    jobs.push_back(next_job);
5904                }
5905
5906                if let Some(job) = jobs.pop_front() {
5907                    if let Some(current_key) = &job.key
5908                        && jobs
5909                            .iter()
5910                            .any(|other_job| other_job.key.as_ref() == Some(current_key))
5911                    {
5912                        continue;
5913                    }
5914                    (job.job)(state.clone(), cx).await;
5915                } else if let Some(job) = job_rx.next().await {
5916                    jobs.push_back(job);
5917                } else {
5918                    break;
5919                }
5920            }
5921            anyhow::Ok(())
5922        })
5923        .detach_and_log_err(cx);
5924
5925        job_tx
5926    }
5927
5928    fn spawn_remote_git_worker(
5929        state: RemoteRepositoryState,
5930        cx: &mut Context<Self>,
5931    ) -> mpsc::UnboundedSender<GitJob> {
5932        let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
5933
5934        cx.spawn(async move |_, cx| {
5935            let state = RepositoryState::Remote(state);
5936            let mut jobs = VecDeque::new();
5937            loop {
5938                while let Ok(Some(next_job)) = job_rx.try_next() {
5939                    jobs.push_back(next_job);
5940                }
5941
5942                if let Some(job) = jobs.pop_front() {
5943                    if let Some(current_key) = &job.key
5944                        && jobs
5945                            .iter()
5946                            .any(|other_job| other_job.key.as_ref() == Some(current_key))
5947                    {
5948                        continue;
5949                    }
5950                    (job.job)(state.clone(), cx).await;
5951                } else if let Some(job) = job_rx.next().await {
5952                    jobs.push_back(job);
5953                } else {
5954                    break;
5955                }
5956            }
5957            anyhow::Ok(())
5958        })
5959        .detach_and_log_err(cx);
5960
5961        job_tx
5962    }
5963
5964    fn load_staged_text(
5965        &mut self,
5966        buffer_id: BufferId,
5967        repo_path: RepoPath,
5968        cx: &App,
5969    ) -> Task<Result<Option<String>>> {
5970        let rx = self.send_job(None, move |state, _| async move {
5971            match state {
5972                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5973                    anyhow::Ok(backend.load_index_text(repo_path).await)
5974                }
5975                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5976                    let response = client
5977                        .request(proto::OpenUnstagedDiff {
5978                            project_id: project_id.to_proto(),
5979                            buffer_id: buffer_id.to_proto(),
5980                        })
5981                        .await?;
5982                    Ok(response.staged_text)
5983                }
5984            }
5985        });
5986        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
5987    }
5988
5989    fn load_committed_text(
5990        &mut self,
5991        buffer_id: BufferId,
5992        repo_path: RepoPath,
5993        cx: &App,
5994    ) -> Task<Result<DiffBasesChange>> {
5995        let rx = self.send_job(None, move |state, _| async move {
5996            match state {
5997                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5998                    let committed_text = backend.load_committed_text(repo_path.clone()).await;
5999                    let staged_text = backend.load_index_text(repo_path).await;
6000                    let diff_bases_change = if committed_text == staged_text {
6001                        DiffBasesChange::SetBoth(committed_text)
6002                    } else {
6003                        DiffBasesChange::SetEach {
6004                            index: staged_text,
6005                            head: committed_text,
6006                        }
6007                    };
6008                    anyhow::Ok(diff_bases_change)
6009                }
6010                RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
6011                    use proto::open_uncommitted_diff_response::Mode;
6012
6013                    let response = client
6014                        .request(proto::OpenUncommittedDiff {
6015                            project_id: project_id.to_proto(),
6016                            buffer_id: buffer_id.to_proto(),
6017                        })
6018                        .await?;
6019                    let mode = Mode::from_i32(response.mode).context("Invalid mode")?;
6020                    let bases = match mode {
6021                        Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
6022                        Mode::IndexAndHead => DiffBasesChange::SetEach {
6023                            head: response.committed_text,
6024                            index: response.staged_text,
6025                        },
6026                    };
6027                    Ok(bases)
6028                }
6029            }
6030        });
6031
6032        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
6033    }
6034
6035    fn load_blob_content(&mut self, oid: Oid, cx: &App) -> Task<Result<String>> {
6036        let repository_id = self.snapshot.id;
6037        let rx = self.send_job(None, move |state, _| async move {
6038            match state {
6039                RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
6040                    backend.load_blob_content(oid).await
6041                }
6042                RepositoryState::Remote(RemoteRepositoryState { client, project_id }) => {
6043                    let response = client
6044                        .request(proto::GetBlobContent {
6045                            project_id: project_id.to_proto(),
6046                            repository_id: repository_id.0,
6047                            oid: oid.to_string(),
6048                        })
6049                        .await?;
6050                    Ok(response.content)
6051                }
6052            }
6053        });
6054        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
6055    }
6056
6057    fn paths_changed(
6058        &mut self,
6059        paths: Vec<RepoPath>,
6060        updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
6061        cx: &mut Context<Self>,
6062    ) {
6063        self.paths_needing_status_update.extend(paths);
6064
6065        let this = cx.weak_entity();
6066        let _ = self.send_keyed_job(
6067            Some(GitJobKey::RefreshStatuses),
6068            None,
6069            |state, mut cx| async move {
6070                let (prev_snapshot, mut changed_paths) = this.update(&mut cx, |this, _| {
6071                    (
6072                        this.snapshot.clone(),
6073                        mem::take(&mut this.paths_needing_status_update),
6074                    )
6075                })?;
6076                let RepositoryState::Local(LocalRepositoryState { backend, .. }) = state else {
6077                    bail!("not a local repository")
6078                };
6079
6080                let paths = changed_paths.iter().cloned().collect::<Vec<_>>();
6081                if paths.is_empty() {
6082                    return Ok(());
6083                }
6084                let statuses = backend.status(&paths).await?;
6085                let stash_entries = backend.stash_entries().await?;
6086
6087                let changed_path_statuses = cx
6088                    .background_spawn(async move {
6089                        let mut changed_path_statuses = Vec::new();
6090                        let prev_statuses = prev_snapshot.statuses_by_path.clone();
6091                        let mut cursor = prev_statuses.cursor::<PathProgress>(());
6092
6093                        for (repo_path, status) in &*statuses.entries {
6094                            changed_paths.remove(repo_path);
6095                            if cursor.seek_forward(&PathTarget::Path(repo_path), Bias::Left)
6096                                && cursor.item().is_some_and(|entry| entry.status == *status)
6097                            {
6098                                continue;
6099                            }
6100
6101                            changed_path_statuses.push(Edit::Insert(StatusEntry {
6102                                repo_path: repo_path.clone(),
6103                                status: *status,
6104                            }));
6105                        }
6106                        let mut cursor = prev_statuses.cursor::<PathProgress>(());
6107                        for path in changed_paths.into_iter() {
6108                            if cursor.seek_forward(&PathTarget::Path(&path), Bias::Left) {
6109                                changed_path_statuses
6110                                    .push(Edit::Remove(PathKey(path.as_ref().clone())));
6111                            }
6112                        }
6113                        changed_path_statuses
6114                    })
6115                    .await;
6116
6117                this.update(&mut cx, |this, cx| {
6118                    if this.snapshot.stash_entries != stash_entries {
6119                        cx.emit(RepositoryEvent::StashEntriesChanged);
6120                        this.snapshot.stash_entries = stash_entries;
6121                    }
6122
6123                    if !changed_path_statuses.is_empty() {
6124                        cx.emit(RepositoryEvent::StatusesChanged);
6125                        this.snapshot
6126                            .statuses_by_path
6127                            .edit(changed_path_statuses, ());
6128                        this.snapshot.scan_id += 1;
6129                    }
6130
6131                    if let Some(updates_tx) = updates_tx {
6132                        updates_tx
6133                            .unbounded_send(DownstreamUpdate::UpdateRepository(
6134                                this.snapshot.clone(),
6135                            ))
6136                            .ok();
6137                    }
6138                })
6139            },
6140        );
6141    }
6142
6143    /// currently running git command and when it started
6144    pub fn current_job(&self) -> Option<JobInfo> {
6145        self.active_jobs.values().next().cloned()
6146    }
6147
6148    pub fn barrier(&mut self) -> oneshot::Receiver<()> {
6149        self.send_job(None, |_, _| async {})
6150    }
6151
6152    fn spawn_job_with_tracking<AsyncFn>(
6153        &mut self,
6154        paths: Vec<RepoPath>,
6155        git_status: pending_op::GitStatus,
6156        cx: &mut Context<Self>,
6157        f: AsyncFn,
6158    ) -> Task<Result<()>>
6159    where
6160        AsyncFn: AsyncFnOnce(WeakEntity<Repository>, &mut AsyncApp) -> Result<()> + 'static,
6161    {
6162        let ids = self.new_pending_ops_for_paths(paths, git_status);
6163
6164        cx.spawn(async move |this, cx| {
6165            let (job_status, result) = match f(this.clone(), cx).await {
6166                Ok(()) => (pending_op::JobStatus::Finished, Ok(())),
6167                Err(err) if err.is::<Canceled>() => (pending_op::JobStatus::Skipped, Ok(())),
6168                Err(err) => (pending_op::JobStatus::Error, Err(err)),
6169            };
6170
6171            this.update(cx, |this, _| {
6172                let mut edits = Vec::with_capacity(ids.len());
6173                for (id, entry) in ids {
6174                    if let Some(mut ops) = this
6175                        .pending_ops
6176                        .get(&PathKey(entry.as_ref().clone()), ())
6177                        .cloned()
6178                    {
6179                        if let Some(op) = ops.op_by_id_mut(id) {
6180                            op.job_status = job_status;
6181                        }
6182                        edits.push(sum_tree::Edit::Insert(ops));
6183                    }
6184                }
6185                this.pending_ops.edit(edits, ());
6186            })?;
6187
6188            result
6189        })
6190    }
6191
6192    fn new_pending_ops_for_paths(
6193        &mut self,
6194        paths: Vec<RepoPath>,
6195        git_status: pending_op::GitStatus,
6196    ) -> Vec<(PendingOpId, RepoPath)> {
6197        let mut edits = Vec::with_capacity(paths.len());
6198        let mut ids = Vec::with_capacity(paths.len());
6199        for path in paths {
6200            let mut ops = self
6201                .pending_ops
6202                .get(&PathKey(path.as_ref().clone()), ())
6203                .cloned()
6204                .unwrap_or_else(|| PendingOps::new(&path));
6205            let id = ops.max_id() + 1;
6206            ops.ops.push(PendingOp {
6207                id,
6208                git_status,
6209                job_status: pending_op::JobStatus::Running,
6210            });
6211            edits.push(sum_tree::Edit::Insert(ops));
6212            ids.push((id, path));
6213        }
6214        self.pending_ops.edit(edits, ());
6215        ids
6216    }
6217    pub fn default_remote_url(&self) -> Option<String> {
6218        self.remote_upstream_url
6219            .clone()
6220            .or(self.remote_origin_url.clone())
6221    }
6222}
6223
6224fn get_permalink_in_rust_registry_src(
6225    provider_registry: Arc<GitHostingProviderRegistry>,
6226    path: PathBuf,
6227    selection: Range<u32>,
6228) -> Result<url::Url> {
6229    #[derive(Deserialize)]
6230    struct CargoVcsGit {
6231        sha1: String,
6232    }
6233
6234    #[derive(Deserialize)]
6235    struct CargoVcsInfo {
6236        git: CargoVcsGit,
6237        path_in_vcs: String,
6238    }
6239
6240    #[derive(Deserialize)]
6241    struct CargoPackage {
6242        repository: String,
6243    }
6244
6245    #[derive(Deserialize)]
6246    struct CargoToml {
6247        package: CargoPackage,
6248    }
6249
6250    let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
6251        let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
6252        Some((dir, json))
6253    }) else {
6254        bail!("No .cargo_vcs_info.json found in parent directories")
6255    };
6256    let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
6257    let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
6258    let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
6259    let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
6260        .context("parsing package.repository field of manifest")?;
6261    let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
6262    let permalink = provider.build_permalink(
6263        remote,
6264        BuildPermalinkParams::new(
6265            &cargo_vcs_info.git.sha1,
6266            &RepoPath::from_rel_path(
6267                &RelPath::new(&path, PathStyle::local()).context("invalid path")?,
6268            ),
6269            Some(selection),
6270        ),
6271    );
6272    Ok(permalink)
6273}
6274
6275fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
6276    let Some(blame) = blame else {
6277        return proto::BlameBufferResponse {
6278            blame_response: None,
6279        };
6280    };
6281
6282    let entries = blame
6283        .entries
6284        .into_iter()
6285        .map(|entry| proto::BlameEntry {
6286            sha: entry.sha.as_bytes().into(),
6287            start_line: entry.range.start,
6288            end_line: entry.range.end,
6289            original_line_number: entry.original_line_number,
6290            author: entry.author,
6291            author_mail: entry.author_mail,
6292            author_time: entry.author_time,
6293            author_tz: entry.author_tz,
6294            committer: entry.committer_name,
6295            committer_mail: entry.committer_email,
6296            committer_time: entry.committer_time,
6297            committer_tz: entry.committer_tz,
6298            summary: entry.summary,
6299            previous: entry.previous,
6300            filename: entry.filename,
6301        })
6302        .collect::<Vec<_>>();
6303
6304    let messages = blame
6305        .messages
6306        .into_iter()
6307        .map(|(oid, message)| proto::CommitMessage {
6308            oid: oid.as_bytes().into(),
6309            message,
6310        })
6311        .collect::<Vec<_>>();
6312
6313    proto::BlameBufferResponse {
6314        blame_response: Some(proto::blame_buffer_response::BlameResponse { entries, messages }),
6315    }
6316}
6317
6318fn deserialize_blame_buffer_response(
6319    response: proto::BlameBufferResponse,
6320) -> Option<git::blame::Blame> {
6321    let response = response.blame_response?;
6322    let entries = response
6323        .entries
6324        .into_iter()
6325        .filter_map(|entry| {
6326            Some(git::blame::BlameEntry {
6327                sha: git::Oid::from_bytes(&entry.sha).ok()?,
6328                range: entry.start_line..entry.end_line,
6329                original_line_number: entry.original_line_number,
6330                committer_name: entry.committer,
6331                committer_time: entry.committer_time,
6332                committer_tz: entry.committer_tz,
6333                committer_email: entry.committer_mail,
6334                author: entry.author,
6335                author_mail: entry.author_mail,
6336                author_time: entry.author_time,
6337                author_tz: entry.author_tz,
6338                summary: entry.summary,
6339                previous: entry.previous,
6340                filename: entry.filename,
6341            })
6342        })
6343        .collect::<Vec<_>>();
6344
6345    let messages = response
6346        .messages
6347        .into_iter()
6348        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
6349        .collect::<HashMap<_, _>>();
6350
6351    Some(Blame { entries, messages })
6352}
6353
6354fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
6355    proto::Branch {
6356        is_head: branch.is_head,
6357        ref_name: branch.ref_name.to_string(),
6358        unix_timestamp: branch
6359            .most_recent_commit
6360            .as_ref()
6361            .map(|commit| commit.commit_timestamp as u64),
6362        upstream: branch.upstream.as_ref().map(|upstream| proto::GitUpstream {
6363            ref_name: upstream.ref_name.to_string(),
6364            tracking: upstream
6365                .tracking
6366                .status()
6367                .map(|upstream| proto::UpstreamTracking {
6368                    ahead: upstream.ahead as u64,
6369                    behind: upstream.behind as u64,
6370                }),
6371        }),
6372        most_recent_commit: branch
6373            .most_recent_commit
6374            .as_ref()
6375            .map(|commit| proto::CommitSummary {
6376                sha: commit.sha.to_string(),
6377                subject: commit.subject.to_string(),
6378                commit_timestamp: commit.commit_timestamp,
6379                author_name: commit.author_name.to_string(),
6380            }),
6381    }
6382}
6383
6384fn worktree_to_proto(worktree: &git::repository::Worktree) -> proto::Worktree {
6385    proto::Worktree {
6386        path: worktree.path.to_string_lossy().to_string(),
6387        ref_name: worktree.ref_name.to_string(),
6388        sha: worktree.sha.to_string(),
6389    }
6390}
6391
6392fn proto_to_worktree(proto: &proto::Worktree) -> git::repository::Worktree {
6393    git::repository::Worktree {
6394        path: PathBuf::from(proto.path.clone()),
6395        ref_name: proto.ref_name.clone().into(),
6396        sha: proto.sha.clone().into(),
6397    }
6398}
6399
6400fn proto_to_branch(proto: &proto::Branch) -> git::repository::Branch {
6401    git::repository::Branch {
6402        is_head: proto.is_head,
6403        ref_name: proto.ref_name.clone().into(),
6404        upstream: proto
6405            .upstream
6406            .as_ref()
6407            .map(|upstream| git::repository::Upstream {
6408                ref_name: upstream.ref_name.to_string().into(),
6409                tracking: upstream
6410                    .tracking
6411                    .as_ref()
6412                    .map(|tracking| {
6413                        git::repository::UpstreamTracking::Tracked(UpstreamTrackingStatus {
6414                            ahead: tracking.ahead as u32,
6415                            behind: tracking.behind as u32,
6416                        })
6417                    })
6418                    .unwrap_or(git::repository::UpstreamTracking::Gone),
6419            }),
6420        most_recent_commit: proto.most_recent_commit.as_ref().map(|commit| {
6421            git::repository::CommitSummary {
6422                sha: commit.sha.to_string().into(),
6423                subject: commit.subject.to_string().into(),
6424                commit_timestamp: commit.commit_timestamp,
6425                author_name: commit.author_name.to_string().into(),
6426                has_parent: true,
6427            }
6428        }),
6429    }
6430}
6431
6432fn commit_details_to_proto(commit: &CommitDetails) -> proto::GitCommitDetails {
6433    proto::GitCommitDetails {
6434        sha: commit.sha.to_string(),
6435        message: commit.message.to_string(),
6436        commit_timestamp: commit.commit_timestamp,
6437        author_email: commit.author_email.to_string(),
6438        author_name: commit.author_name.to_string(),
6439    }
6440}
6441
6442fn proto_to_commit_details(proto: &proto::GitCommitDetails) -> CommitDetails {
6443    CommitDetails {
6444        sha: proto.sha.clone().into(),
6445        message: proto.message.clone().into(),
6446        commit_timestamp: proto.commit_timestamp,
6447        author_email: proto.author_email.clone().into(),
6448        author_name: proto.author_name.clone().into(),
6449    }
6450}
6451
6452async fn compute_snapshot(
6453    id: RepositoryId,
6454    work_directory_abs_path: Arc<Path>,
6455    prev_snapshot: RepositorySnapshot,
6456    backend: Arc<dyn GitRepository>,
6457) -> Result<(RepositorySnapshot, Vec<RepositoryEvent>)> {
6458    let mut events = Vec::new();
6459    let branches = backend.branches().await?;
6460    let branch = branches.into_iter().find(|branch| branch.is_head);
6461    let statuses = backend
6462        .status(&[RepoPath::from_rel_path(
6463            &RelPath::new(".".as_ref(), PathStyle::local()).unwrap(),
6464        )])
6465        .await?;
6466    let stash_entries = backend.stash_entries().await?;
6467    let statuses_by_path = SumTree::from_iter(
6468        statuses
6469            .entries
6470            .iter()
6471            .map(|(repo_path, status)| StatusEntry {
6472                repo_path: repo_path.clone(),
6473                status: *status,
6474            }),
6475        (),
6476    );
6477    let (merge_details, merge_heads_changed) =
6478        MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?;
6479    log::debug!("new merge details (changed={merge_heads_changed:?}): {merge_details:?}");
6480
6481    if merge_heads_changed {
6482        events.push(RepositoryEvent::MergeHeadsChanged);
6483    }
6484
6485    if statuses_by_path != prev_snapshot.statuses_by_path {
6486        events.push(RepositoryEvent::StatusesChanged)
6487    }
6488
6489    // Useful when branch is None in detached head state
6490    let head_commit = match backend.head_sha().await {
6491        Some(head_sha) => backend.show(head_sha).await.log_err(),
6492        None => None,
6493    };
6494
6495    if branch != prev_snapshot.branch || head_commit != prev_snapshot.head_commit {
6496        events.push(RepositoryEvent::BranchChanged);
6497    }
6498
6499    let remote_origin_url = backend.remote_url("origin").await;
6500    let remote_upstream_url = backend.remote_url("upstream").await;
6501
6502    let snapshot = RepositorySnapshot {
6503        id,
6504        statuses_by_path,
6505        work_directory_abs_path,
6506        path_style: prev_snapshot.path_style,
6507        scan_id: prev_snapshot.scan_id + 1,
6508        branch,
6509        head_commit,
6510        merge: merge_details,
6511        remote_origin_url,
6512        remote_upstream_url,
6513        stash_entries,
6514    };
6515
6516    Ok((snapshot, events))
6517}
6518
6519fn status_from_proto(
6520    simple_status: i32,
6521    status: Option<proto::GitFileStatus>,
6522) -> anyhow::Result<FileStatus> {
6523    use proto::git_file_status::Variant;
6524
6525    let Some(variant) = status.and_then(|status| status.variant) else {
6526        let code = proto::GitStatus::from_i32(simple_status)
6527            .with_context(|| format!("Invalid git status code: {simple_status}"))?;
6528        let result = match code {
6529            proto::GitStatus::Added => TrackedStatus {
6530                worktree_status: StatusCode::Added,
6531                index_status: StatusCode::Unmodified,
6532            }
6533            .into(),
6534            proto::GitStatus::Modified => TrackedStatus {
6535                worktree_status: StatusCode::Modified,
6536                index_status: StatusCode::Unmodified,
6537            }
6538            .into(),
6539            proto::GitStatus::Conflict => UnmergedStatus {
6540                first_head: UnmergedStatusCode::Updated,
6541                second_head: UnmergedStatusCode::Updated,
6542            }
6543            .into(),
6544            proto::GitStatus::Deleted => TrackedStatus {
6545                worktree_status: StatusCode::Deleted,
6546                index_status: StatusCode::Unmodified,
6547            }
6548            .into(),
6549            _ => anyhow::bail!("Invalid code for simple status: {simple_status}"),
6550        };
6551        return Ok(result);
6552    };
6553
6554    let result = match variant {
6555        Variant::Untracked(_) => FileStatus::Untracked,
6556        Variant::Ignored(_) => FileStatus::Ignored,
6557        Variant::Unmerged(unmerged) => {
6558            let [first_head, second_head] =
6559                [unmerged.first_head, unmerged.second_head].map(|head| {
6560                    let code = proto::GitStatus::from_i32(head)
6561                        .with_context(|| format!("Invalid git status code: {head}"))?;
6562                    let result = match code {
6563                        proto::GitStatus::Added => UnmergedStatusCode::Added,
6564                        proto::GitStatus::Updated => UnmergedStatusCode::Updated,
6565                        proto::GitStatus::Deleted => UnmergedStatusCode::Deleted,
6566                        _ => anyhow::bail!("Invalid code for unmerged status: {code:?}"),
6567                    };
6568                    Ok(result)
6569                });
6570            let [first_head, second_head] = [first_head?, second_head?];
6571            UnmergedStatus {
6572                first_head,
6573                second_head,
6574            }
6575            .into()
6576        }
6577        Variant::Tracked(tracked) => {
6578            let [index_status, worktree_status] = [tracked.index_status, tracked.worktree_status]
6579                .map(|status| {
6580                    let code = proto::GitStatus::from_i32(status)
6581                        .with_context(|| format!("Invalid git status code: {status}"))?;
6582                    let result = match code {
6583                        proto::GitStatus::Modified => StatusCode::Modified,
6584                        proto::GitStatus::TypeChanged => StatusCode::TypeChanged,
6585                        proto::GitStatus::Added => StatusCode::Added,
6586                        proto::GitStatus::Deleted => StatusCode::Deleted,
6587                        proto::GitStatus::Renamed => StatusCode::Renamed,
6588                        proto::GitStatus::Copied => StatusCode::Copied,
6589                        proto::GitStatus::Unmodified => StatusCode::Unmodified,
6590                        _ => anyhow::bail!("Invalid code for tracked status: {code:?}"),
6591                    };
6592                    Ok(result)
6593                });
6594            let [index_status, worktree_status] = [index_status?, worktree_status?];
6595            TrackedStatus {
6596                index_status,
6597                worktree_status,
6598            }
6599            .into()
6600        }
6601    };
6602    Ok(result)
6603}
6604
6605fn status_to_proto(status: FileStatus) -> proto::GitFileStatus {
6606    use proto::git_file_status::{Tracked, Unmerged, Variant};
6607
6608    let variant = match status {
6609        FileStatus::Untracked => Variant::Untracked(Default::default()),
6610        FileStatus::Ignored => Variant::Ignored(Default::default()),
6611        FileStatus::Unmerged(UnmergedStatus {
6612            first_head,
6613            second_head,
6614        }) => Variant::Unmerged(Unmerged {
6615            first_head: unmerged_status_to_proto(first_head),
6616            second_head: unmerged_status_to_proto(second_head),
6617        }),
6618        FileStatus::Tracked(TrackedStatus {
6619            index_status,
6620            worktree_status,
6621        }) => Variant::Tracked(Tracked {
6622            index_status: tracked_status_to_proto(index_status),
6623            worktree_status: tracked_status_to_proto(worktree_status),
6624        }),
6625    };
6626    proto::GitFileStatus {
6627        variant: Some(variant),
6628    }
6629}
6630
6631fn unmerged_status_to_proto(code: UnmergedStatusCode) -> i32 {
6632    match code {
6633        UnmergedStatusCode::Added => proto::GitStatus::Added as _,
6634        UnmergedStatusCode::Deleted => proto::GitStatus::Deleted as _,
6635        UnmergedStatusCode::Updated => proto::GitStatus::Updated as _,
6636    }
6637}
6638
6639fn tracked_status_to_proto(code: StatusCode) -> i32 {
6640    match code {
6641        StatusCode::Added => proto::GitStatus::Added as _,
6642        StatusCode::Deleted => proto::GitStatus::Deleted as _,
6643        StatusCode::Modified => proto::GitStatus::Modified as _,
6644        StatusCode::Renamed => proto::GitStatus::Renamed as _,
6645        StatusCode::TypeChanged => proto::GitStatus::TypeChanged as _,
6646        StatusCode::Copied => proto::GitStatus::Copied as _,
6647        StatusCode::Unmodified => proto::GitStatus::Unmodified as _,
6648    }
6649}