git_store.rs

   1mod conflict_set;
   2pub mod git_traversal;
   3
   4use crate::{
   5    ProjectEnvironment, ProjectItem, ProjectPath,
   6    buffer_store::{BufferStore, BufferStoreEvent},
   7    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   8};
   9use anyhow::{Context as _, Result, anyhow, bail};
  10use askpass::AskPassDelegate;
  11use buffer_diff::{BufferDiff, BufferDiffEvent};
  12use client::ProjectId;
  13use collections::HashMap;
  14pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, ConflictSetUpdate};
  15use fs::Fs;
  16use futures::{
  17    FutureExt, StreamExt,
  18    channel::{mpsc, oneshot},
  19    future::{self, Shared},
  20    stream::FuturesOrdered,
  21};
  22use git::{
  23    BuildPermalinkParams, GitHostingProviderRegistry, WORK_DIRECTORY_REPO_PATH,
  24    blame::Blame,
  25    parse_git_remote_url,
  26    repository::{
  27        Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, DiffType, FetchOptions,
  28        GitRepository, GitRepositoryCheckpoint, PushOptions, Remote, RemoteCommandOutput, RepoPath,
  29        ResetMode, UpstreamTrackingStatus,
  30    },
  31    status::{
  32        FileStatus, GitSummary, StatusCode, TrackedStatus, UnmergedStatus, UnmergedStatusCode,
  33    },
  34};
  35use gpui::{
  36    App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
  37    WeakEntity,
  38};
  39use language::{
  40    Buffer, BufferEvent, Language, LanguageRegistry,
  41    proto::{deserialize_version, serialize_version},
  42};
  43use parking_lot::Mutex;
  44use postage::stream::Stream as _;
  45use rpc::{
  46    AnyProtoClient, TypedEnvelope,
  47    proto::{self, FromProto, ToProto, git_reset, split_repository_update},
  48};
  49use serde::Deserialize;
  50use std::{
  51    cmp::Ordering,
  52    collections::{BTreeSet, VecDeque},
  53    future::Future,
  54    mem,
  55    ops::Range,
  56    path::{Path, PathBuf},
  57    sync::{
  58        Arc,
  59        atomic::{self, AtomicU64},
  60    },
  61    time::Instant,
  62};
  63use sum_tree::{Edit, SumTree, TreeSet};
  64use text::{Bias, BufferId};
  65use util::{ResultExt, debug_panic, paths::SanitizedPath, post_inc};
  66use worktree::{
  67    File, PathChange, PathKey, PathProgress, PathSummary, PathTarget, ProjectEntryId,
  68    UpdatedGitRepositoriesSet, UpdatedGitRepository, Worktree,
  69};
  70
  71pub struct GitStore {
  72    state: GitStoreState,
  73    buffer_store: Entity<BufferStore>,
  74    worktree_store: Entity<WorktreeStore>,
  75    repositories: HashMap<RepositoryId, Entity<Repository>>,
  76    active_repo_id: Option<RepositoryId>,
  77    #[allow(clippy::type_complexity)]
  78    loading_diffs:
  79        HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
  80    diffs: HashMap<BufferId, Entity<BufferGitState>>,
  81    shared_diffs: HashMap<proto::PeerId, HashMap<BufferId, SharedDiffs>>,
  82    _subscriptions: Vec<Subscription>,
  83}
  84
  85#[derive(Default)]
  86struct SharedDiffs {
  87    unstaged: Option<Entity<BufferDiff>>,
  88    uncommitted: Option<Entity<BufferDiff>>,
  89}
  90
  91struct BufferGitState {
  92    unstaged_diff: Option<WeakEntity<BufferDiff>>,
  93    uncommitted_diff: Option<WeakEntity<BufferDiff>>,
  94    branch_diff: Option<WeakEntity<BufferDiff>>,
  95    conflict_set: Option<WeakEntity<ConflictSet>>,
  96    recalculate_diff_task: Option<Task<Result<()>>>,
  97    reparse_conflict_markers_task: Option<Task<Result<()>>>,
  98    language: Option<Arc<Language>>,
  99    language_registry: Option<Arc<LanguageRegistry>>,
 100    conflict_updated_futures: Vec<oneshot::Sender<()>>,
 101    recalculating_tx: postage::watch::Sender<bool>,
 102
 103    /// These operation counts are used to ensure that head and index text
 104    /// values read from the git repository are up-to-date with any hunk staging
 105    /// operations that have been performed on the BufferDiff.
 106    ///
 107    /// The operation count is incremented immediately when the user initiates a
 108    /// hunk stage/unstage operation. Then, upon finishing writing the new index
 109    /// text do disk, the `operation count as of write` is updated to reflect
 110    /// the operation count that prompted the write.
 111    hunk_staging_operation_count: usize,
 112    hunk_staging_operation_count_as_of_write: usize,
 113
 114    head_text: Option<Arc<String>>,
 115    index_text: Option<Arc<String>>,
 116    head_changed: bool,
 117    index_changed: bool,
 118    language_changed: bool,
 119}
 120
 121#[derive(Clone, Debug)]
 122enum DiffBasesChange {
 123    SetIndex(Option<String>),
 124    SetHead(Option<String>),
 125    SetEach {
 126        index: Option<String>,
 127        head: Option<String>,
 128    },
 129    SetBoth(Option<String>),
 130}
 131
 132#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
 133enum DiffKind {
 134    Unstaged,
 135    Uncommitted,
 136}
 137
 138enum GitStoreState {
 139    Local {
 140        next_repository_id: Arc<AtomicU64>,
 141        downstream: Option<LocalDownstreamState>,
 142        project_environment: Entity<ProjectEnvironment>,
 143        fs: Arc<dyn Fs>,
 144    },
 145    Remote {
 146        upstream_client: AnyProtoClient,
 147        upstream_project_id: u64,
 148        downstream: Option<(AnyProtoClient, ProjectId)>,
 149    },
 150}
 151
 152enum DownstreamUpdate {
 153    UpdateRepository(RepositorySnapshot),
 154    RemoveRepository(RepositoryId),
 155}
 156
 157struct LocalDownstreamState {
 158    client: AnyProtoClient,
 159    project_id: ProjectId,
 160    updates_tx: mpsc::UnboundedSender<DownstreamUpdate>,
 161    _task: Task<Result<()>>,
 162}
 163
 164#[derive(Clone, Debug)]
 165pub struct GitStoreCheckpoint {
 166    checkpoints_by_work_dir_abs_path: HashMap<Arc<Path>, GitRepositoryCheckpoint>,
 167}
 168
 169#[derive(Clone, Debug, PartialEq, Eq)]
 170pub struct StatusEntry {
 171    pub repo_path: RepoPath,
 172    pub status: FileStatus,
 173}
 174
 175impl StatusEntry {
 176    fn to_proto(&self) -> proto::StatusEntry {
 177        let simple_status = match self.status {
 178            FileStatus::Ignored | FileStatus::Untracked => proto::GitStatus::Added as i32,
 179            FileStatus::Unmerged { .. } => proto::GitStatus::Conflict as i32,
 180            FileStatus::Tracked(TrackedStatus {
 181                index_status,
 182                worktree_status,
 183            }) => tracked_status_to_proto(if worktree_status != StatusCode::Unmodified {
 184                worktree_status
 185            } else {
 186                index_status
 187            }),
 188        };
 189
 190        proto::StatusEntry {
 191            repo_path: self.repo_path.as_ref().to_proto(),
 192            simple_status,
 193            status: Some(status_to_proto(self.status)),
 194        }
 195    }
 196}
 197
 198impl TryFrom<proto::StatusEntry> for StatusEntry {
 199    type Error = anyhow::Error;
 200
 201    fn try_from(value: proto::StatusEntry) -> Result<Self, Self::Error> {
 202        let repo_path = RepoPath(Arc::<Path>::from_proto(value.repo_path));
 203        let status = status_from_proto(value.simple_status, value.status)?;
 204        Ok(Self { repo_path, status })
 205    }
 206}
 207
 208impl sum_tree::Item for StatusEntry {
 209    type Summary = PathSummary<GitSummary>;
 210
 211    fn summary(&self, _: &<Self::Summary as sum_tree::Summary>::Context) -> Self::Summary {
 212        PathSummary {
 213            max_path: self.repo_path.0.clone(),
 214            item_summary: self.status.summary(),
 215        }
 216    }
 217}
 218
 219impl sum_tree::KeyedItem for StatusEntry {
 220    type Key = PathKey;
 221
 222    fn key(&self) -> Self::Key {
 223        PathKey(self.repo_path.0.clone())
 224    }
 225}
 226
 227#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
 228pub struct RepositoryId(pub u64);
 229
 230#[derive(Clone, Debug, Default, PartialEq, Eq)]
 231pub struct MergeDetails {
 232    pub conflicted_paths: TreeSet<RepoPath>,
 233    pub message: Option<SharedString>,
 234    pub heads: Vec<Option<SharedString>>,
 235}
 236
 237#[derive(Clone, Debug, PartialEq, Eq)]
 238pub struct RepositorySnapshot {
 239    pub id: RepositoryId,
 240    pub statuses_by_path: SumTree<StatusEntry>,
 241    pub work_directory_abs_path: Arc<Path>,
 242    pub branch: Option<Branch>,
 243    pub head_commit: Option<CommitDetails>,
 244    pub scan_id: u64,
 245    pub merge: MergeDetails,
 246    pub remote_origin_url: Option<String>,
 247    pub remote_upstream_url: Option<String>,
 248}
 249
 250type JobId = u64;
 251
 252#[derive(Clone, Debug, PartialEq, Eq)]
 253pub struct JobInfo {
 254    pub start: Instant,
 255    pub message: SharedString,
 256}
 257
 258pub struct Repository {
 259    this: WeakEntity<Self>,
 260    snapshot: RepositorySnapshot,
 261    commit_message_buffer: Option<Entity<Buffer>>,
 262    git_store: WeakEntity<GitStore>,
 263    // For a local repository, holds paths that have had worktree events since the last status scan completed,
 264    // and that should be examined during the next status scan.
 265    paths_needing_status_update: BTreeSet<RepoPath>,
 266    job_sender: mpsc::UnboundedSender<GitJob>,
 267    active_jobs: HashMap<JobId, JobInfo>,
 268    job_id: JobId,
 269    askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
 270    latest_askpass_id: u64,
 271}
 272
 273impl std::ops::Deref for Repository {
 274    type Target = RepositorySnapshot;
 275
 276    fn deref(&self) -> &Self::Target {
 277        &self.snapshot
 278    }
 279}
 280
 281#[derive(Clone)]
 282pub enum RepositoryState {
 283    Local {
 284        backend: Arc<dyn GitRepository>,
 285        environment: Arc<HashMap<String, String>>,
 286    },
 287    Remote {
 288        project_id: ProjectId,
 289        client: AnyProtoClient,
 290    },
 291}
 292
 293#[derive(Clone, Debug)]
 294pub enum RepositoryEvent {
 295    Updated { full_scan: bool, new_instance: bool },
 296    MergeHeadsChanged,
 297}
 298
 299#[derive(Clone, Debug)]
 300pub struct JobsUpdated;
 301
 302#[derive(Debug)]
 303pub enum GitStoreEvent {
 304    ActiveRepositoryChanged(Option<RepositoryId>),
 305    RepositoryUpdated(RepositoryId, RepositoryEvent, bool),
 306    RepositoryAdded(RepositoryId),
 307    RepositoryRemoved(RepositoryId),
 308    IndexWriteError(anyhow::Error),
 309    JobsUpdated,
 310    ConflictsUpdated,
 311}
 312
 313impl EventEmitter<RepositoryEvent> for Repository {}
 314impl EventEmitter<JobsUpdated> for Repository {}
 315impl EventEmitter<GitStoreEvent> for GitStore {}
 316
 317pub struct GitJob {
 318    job: Box<dyn FnOnce(RepositoryState, &mut AsyncApp) -> Task<()>>,
 319    key: Option<GitJobKey>,
 320}
 321
 322#[derive(PartialEq, Eq)]
 323enum GitJobKey {
 324    WriteIndex(RepoPath),
 325    ReloadBufferDiffBases,
 326    RefreshStatuses,
 327    ReloadGitState,
 328}
 329
 330impl GitStore {
 331    pub fn local(
 332        worktree_store: &Entity<WorktreeStore>,
 333        buffer_store: Entity<BufferStore>,
 334        environment: Entity<ProjectEnvironment>,
 335        fs: Arc<dyn Fs>,
 336        cx: &mut Context<Self>,
 337    ) -> Self {
 338        Self::new(
 339            worktree_store.clone(),
 340            buffer_store,
 341            GitStoreState::Local {
 342                next_repository_id: Arc::new(AtomicU64::new(1)),
 343                downstream: None,
 344                project_environment: environment,
 345                fs,
 346            },
 347            cx,
 348        )
 349    }
 350
 351    pub fn remote(
 352        worktree_store: &Entity<WorktreeStore>,
 353        buffer_store: Entity<BufferStore>,
 354        upstream_client: AnyProtoClient,
 355        project_id: u64,
 356        cx: &mut Context<Self>,
 357    ) -> Self {
 358        Self::new(
 359            worktree_store.clone(),
 360            buffer_store,
 361            GitStoreState::Remote {
 362                upstream_client,
 363                upstream_project_id: project_id,
 364                downstream: None,
 365            },
 366            cx,
 367        )
 368    }
 369
 370    fn new(
 371        worktree_store: Entity<WorktreeStore>,
 372        buffer_store: Entity<BufferStore>,
 373        state: GitStoreState,
 374        cx: &mut Context<Self>,
 375    ) -> Self {
 376        let _subscriptions = vec![
 377            cx.subscribe(&worktree_store, Self::on_worktree_store_event),
 378            cx.subscribe(&buffer_store, Self::on_buffer_store_event),
 379        ];
 380
 381        GitStore {
 382            state,
 383            buffer_store,
 384            worktree_store,
 385            repositories: HashMap::default(),
 386            active_repo_id: None,
 387            _subscriptions,
 388            loading_diffs: HashMap::default(),
 389            shared_diffs: HashMap::default(),
 390            diffs: HashMap::default(),
 391        }
 392    }
 393
 394    pub fn init(client: &AnyProtoClient) {
 395        client.add_entity_request_handler(Self::handle_get_remotes);
 396        client.add_entity_request_handler(Self::handle_get_branches);
 397        client.add_entity_request_handler(Self::handle_get_default_branch);
 398        client.add_entity_request_handler(Self::handle_change_branch);
 399        client.add_entity_request_handler(Self::handle_create_branch);
 400        client.add_entity_request_handler(Self::handle_git_init);
 401        client.add_entity_request_handler(Self::handle_push);
 402        client.add_entity_request_handler(Self::handle_pull);
 403        client.add_entity_request_handler(Self::handle_fetch);
 404        client.add_entity_request_handler(Self::handle_stage);
 405        client.add_entity_request_handler(Self::handle_unstage);
 406        client.add_entity_request_handler(Self::handle_stash);
 407        client.add_entity_request_handler(Self::handle_stash_pop);
 408        client.add_entity_request_handler(Self::handle_commit);
 409        client.add_entity_request_handler(Self::handle_reset);
 410        client.add_entity_request_handler(Self::handle_show);
 411        client.add_entity_request_handler(Self::handle_load_commit_diff);
 412        client.add_entity_request_handler(Self::handle_checkout_files);
 413        client.add_entity_request_handler(Self::handle_open_commit_message_buffer);
 414        client.add_entity_request_handler(Self::handle_set_index_text);
 415        client.add_entity_request_handler(Self::handle_askpass);
 416        client.add_entity_request_handler(Self::handle_check_for_pushed_commits);
 417        client.add_entity_request_handler(Self::handle_git_diff);
 418        client.add_entity_request_handler(Self::handle_open_unstaged_diff);
 419        client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
 420        client.add_entity_message_handler(Self::handle_update_diff_bases);
 421        client.add_entity_request_handler(Self::handle_get_permalink_to_line);
 422        client.add_entity_request_handler(Self::handle_blame_buffer);
 423        client.add_entity_message_handler(Self::handle_update_repository);
 424        client.add_entity_message_handler(Self::handle_remove_repository);
 425        client.add_entity_request_handler(Self::handle_git_clone);
 426    }
 427
 428    pub fn is_local(&self) -> bool {
 429        matches!(self.state, GitStoreState::Local { .. })
 430    }
 431
 432    pub fn shared(&mut self, project_id: u64, client: AnyProtoClient, cx: &mut Context<Self>) {
 433        match &mut self.state {
 434            GitStoreState::Remote {
 435                downstream: downstream_client,
 436                ..
 437            } => {
 438                for repo in self.repositories.values() {
 439                    let update = repo.read(cx).snapshot.initial_update(project_id);
 440                    for update in split_repository_update(update) {
 441                        client.send(update).log_err();
 442                    }
 443                }
 444                *downstream_client = Some((client, ProjectId(project_id)));
 445            }
 446            GitStoreState::Local {
 447                downstream: downstream_client,
 448                ..
 449            } => {
 450                let mut snapshots = HashMap::default();
 451                let (updates_tx, mut updates_rx) = mpsc::unbounded();
 452                for repo in self.repositories.values() {
 453                    updates_tx
 454                        .unbounded_send(DownstreamUpdate::UpdateRepository(
 455                            repo.read(cx).snapshot.clone(),
 456                        ))
 457                        .ok();
 458                }
 459                *downstream_client = Some(LocalDownstreamState {
 460                    client: client.clone(),
 461                    project_id: ProjectId(project_id),
 462                    updates_tx,
 463                    _task: cx.spawn(async move |this, cx| {
 464                        cx.background_spawn(async move {
 465                            while let Some(update) = updates_rx.next().await {
 466                                match update {
 467                                    DownstreamUpdate::UpdateRepository(snapshot) => {
 468                                        if let Some(old_snapshot) = snapshots.get_mut(&snapshot.id)
 469                                        {
 470                                            let update =
 471                                                snapshot.build_update(old_snapshot, project_id);
 472                                            *old_snapshot = snapshot;
 473                                            for update in split_repository_update(update) {
 474                                                client.send(update)?;
 475                                            }
 476                                        } else {
 477                                            let update = snapshot.initial_update(project_id);
 478                                            for update in split_repository_update(update) {
 479                                                client.send(update)?;
 480                                            }
 481                                            snapshots.insert(snapshot.id, snapshot);
 482                                        }
 483                                    }
 484                                    DownstreamUpdate::RemoveRepository(id) => {
 485                                        client.send(proto::RemoveRepository {
 486                                            project_id,
 487                                            id: id.to_proto(),
 488                                        })?;
 489                                    }
 490                                }
 491                            }
 492                            anyhow::Ok(())
 493                        })
 494                        .await
 495                        .ok();
 496                        this.update(cx, |this, _| {
 497                            if let GitStoreState::Local {
 498                                downstream: downstream_client,
 499                                ..
 500                            } = &mut this.state
 501                            {
 502                                downstream_client.take();
 503                            } else {
 504                                unreachable!("unshared called on remote store");
 505                            }
 506                        })
 507                    }),
 508                });
 509            }
 510        }
 511    }
 512
 513    pub fn unshared(&mut self, _cx: &mut Context<Self>) {
 514        match &mut self.state {
 515            GitStoreState::Local {
 516                downstream: downstream_client,
 517                ..
 518            } => {
 519                downstream_client.take();
 520            }
 521            GitStoreState::Remote {
 522                downstream: downstream_client,
 523                ..
 524            } => {
 525                downstream_client.take();
 526            }
 527        }
 528        self.shared_diffs.clear();
 529    }
 530
 531    pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) {
 532        self.shared_diffs.remove(peer_id);
 533    }
 534
 535    pub fn active_repository(&self) -> Option<Entity<Repository>> {
 536        self.active_repo_id
 537            .as_ref()
 538            .map(|id| self.repositories[id].clone())
 539    }
 540
 541    pub fn open_unstaged_diff(
 542        &mut self,
 543        buffer: Entity<Buffer>,
 544        cx: &mut Context<Self>,
 545    ) -> Task<Result<Entity<BufferDiff>>> {
 546        let buffer_id = buffer.read(cx).remote_id();
 547        if let Some(diff_state) = self.diffs.get(&buffer_id)
 548            && let Some(unstaged_diff) = diff_state
 549                .read(cx)
 550                .unstaged_diff
 551                .as_ref()
 552                .and_then(|weak| weak.upgrade())
 553        {
 554            if let Some(task) =
 555                diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
 556            {
 557                return cx.background_executor().spawn(async move {
 558                    task.await;
 559                    Ok(unstaged_diff)
 560                });
 561            }
 562            return Task::ready(Ok(unstaged_diff));
 563        }
 564
 565        let Some((repo, repo_path)) =
 566            self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
 567        else {
 568            return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
 569        };
 570
 571        let task = self
 572            .loading_diffs
 573            .entry((buffer_id, DiffKind::Unstaged))
 574            .or_insert_with(|| {
 575                let staged_text = repo.update(cx, |repo, cx| {
 576                    repo.load_staged_text(buffer_id, repo_path, cx)
 577                });
 578                cx.spawn(async move |this, cx| {
 579                    Self::open_diff_internal(
 580                        this,
 581                        DiffKind::Unstaged,
 582                        staged_text.await.map(DiffBasesChange::SetIndex),
 583                        buffer,
 584                        cx,
 585                    )
 586                    .await
 587                    .map_err(Arc::new)
 588                })
 589                .shared()
 590            })
 591            .clone();
 592
 593        cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
 594    }
 595
 596    pub fn open_diff_from_default_branch(
 597        &mut self,
 598        cx: &mut Context<Self>,
 599    ) -> Task<Result<Vec<(Entity<Buffer>, Entity<BufferDiff>)>>> {
 600    }
 601
 602    pub fn open_uncommitted_diff(
 603        &mut self,
 604        buffer: Entity<Buffer>,
 605        cx: &mut Context<Self>,
 606    ) -> Task<Result<Entity<BufferDiff>>> {
 607        let buffer_id = buffer.read(cx).remote_id();
 608
 609        if let Some(diff_state) = self.diffs.get(&buffer_id)
 610            && let Some(uncommitted_diff) = diff_state
 611                .read(cx)
 612                .uncommitted_diff
 613                .as_ref()
 614                .and_then(|weak| weak.upgrade())
 615        {
 616            if let Some(task) =
 617                diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
 618            {
 619                return cx.background_executor().spawn(async move {
 620                    task.await;
 621                    Ok(uncommitted_diff)
 622                });
 623            }
 624            return Task::ready(Ok(uncommitted_diff));
 625        }
 626
 627        let Some((repo, repo_path)) =
 628            self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
 629        else {
 630            return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
 631        };
 632
 633        let task = self
 634            .loading_diffs
 635            .entry((buffer_id, DiffKind::Uncommitted))
 636            .or_insert_with(|| {
 637                let changes = repo.update(cx, |repo, cx| {
 638                    repo.load_committed_text(buffer_id, repo_path, cx)
 639                });
 640
 641                cx.spawn(async move |this, cx| {
 642                    Self::open_diff_internal(this, DiffKind::Uncommitted, changes.await, buffer, cx)
 643                        .await
 644                        .map_err(Arc::new)
 645                })
 646                .shared()
 647            })
 648            .clone();
 649
 650        cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
 651    }
 652
 653    async fn open_diff_internal(
 654        this: WeakEntity<Self>,
 655        kind: DiffKind,
 656        texts: Result<DiffBasesChange>,
 657        buffer_entity: Entity<Buffer>,
 658        cx: &mut AsyncApp,
 659    ) -> Result<Entity<BufferDiff>> {
 660        let diff_bases_change = match texts {
 661            Err(e) => {
 662                this.update(cx, |this, cx| {
 663                    let buffer = buffer_entity.read(cx);
 664                    let buffer_id = buffer.remote_id();
 665                    this.loading_diffs.remove(&(buffer_id, kind));
 666                })?;
 667                return Err(e);
 668            }
 669            Ok(change) => change,
 670        };
 671
 672        this.update(cx, |this, cx| {
 673            let buffer = buffer_entity.read(cx);
 674            let buffer_id = buffer.remote_id();
 675            let language = buffer.language().cloned();
 676            let language_registry = buffer.language_registry();
 677            let text_snapshot = buffer.text_snapshot();
 678            this.loading_diffs.remove(&(buffer_id, kind));
 679
 680            let diff_state = this
 681                .diffs
 682                .entry(buffer_id)
 683                .or_insert_with(|| cx.new(|_| BufferGitState::new()));
 684
 685            let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
 686
 687            cx.subscribe(&diff, Self::on_buffer_diff_event).detach();
 688            diff_state.update(cx, |diff_state, cx| {
 689                diff_state.language = language;
 690                diff_state.language_registry = language_registry;
 691
 692                match kind {
 693                    DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
 694                    DiffKind::Uncommitted => {
 695                        let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
 696                            diff
 697                        } else {
 698                            let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
 699                            diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
 700                            unstaged_diff
 701                        };
 702
 703                        diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
 704                        diff_state.uncommitted_diff = Some(diff.downgrade())
 705                    }
 706                }
 707
 708                diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
 709                let rx = diff_state.wait_for_recalculation();
 710
 711                anyhow::Ok(async move {
 712                    if let Some(rx) = rx {
 713                        rx.await;
 714                    }
 715                    Ok(diff)
 716                })
 717            })
 718        })??
 719        .await
 720    }
 721
 722    pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
 723        let diff_state = self.diffs.get(&buffer_id)?;
 724        diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
 725    }
 726
 727    pub fn get_uncommitted_diff(
 728        &self,
 729        buffer_id: BufferId,
 730        cx: &App,
 731    ) -> Option<Entity<BufferDiff>> {
 732        let diff_state = self.diffs.get(&buffer_id)?;
 733        diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
 734    }
 735
 736    pub fn open_conflict_set(
 737        &mut self,
 738        buffer: Entity<Buffer>,
 739        cx: &mut Context<Self>,
 740    ) -> Entity<ConflictSet> {
 741        log::debug!("open conflict set");
 742        let buffer_id = buffer.read(cx).remote_id();
 743
 744        if let Some(git_state) = self.diffs.get(&buffer_id)
 745            && let Some(conflict_set) = git_state
 746                .read(cx)
 747                .conflict_set
 748                .as_ref()
 749                .and_then(|weak| weak.upgrade())
 750        {
 751            let conflict_set = conflict_set;
 752            let buffer_snapshot = buffer.read(cx).text_snapshot();
 753
 754            git_state.update(cx, |state, cx| {
 755                let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
 756            });
 757
 758            return conflict_set;
 759        }
 760
 761        let is_unmerged = self
 762            .repository_and_path_for_buffer_id(buffer_id, cx)
 763            .is_some_and(|(repo, path)| repo.read(cx).snapshot.has_conflict(&path));
 764        let buffer_git_state = self
 765            .diffs
 766            .entry(buffer_id)
 767            .or_insert_with(|| cx.new(|_| BufferGitState::new()));
 768        let conflict_set = cx.new(|cx| ConflictSet::new(buffer_id, is_unmerged, cx));
 769
 770        self._subscriptions
 771            .push(cx.subscribe(&conflict_set, |_, _, _, cx| {
 772                cx.emit(GitStoreEvent::ConflictsUpdated);
 773            }));
 774
 775        buffer_git_state.update(cx, |state, cx| {
 776            state.conflict_set = Some(conflict_set.downgrade());
 777            let buffer_snapshot = buffer.read(cx).text_snapshot();
 778            let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
 779        });
 780
 781        conflict_set
 782    }
 783
 784    pub fn project_path_git_status(
 785        &self,
 786        project_path: &ProjectPath,
 787        cx: &App,
 788    ) -> Option<FileStatus> {
 789        let (repo, repo_path) = self.repository_and_path_for_project_path(project_path, cx)?;
 790        Some(repo.read(cx).status_for_path(&repo_path)?.status)
 791    }
 792
 793    pub fn checkpoint(&self, cx: &mut App) -> Task<Result<GitStoreCheckpoint>> {
 794        let mut work_directory_abs_paths = Vec::new();
 795        let mut checkpoints = Vec::new();
 796        for repository in self.repositories.values() {
 797            repository.update(cx, |repository, _| {
 798                work_directory_abs_paths.push(repository.snapshot.work_directory_abs_path.clone());
 799                checkpoints.push(repository.checkpoint().map(|checkpoint| checkpoint?));
 800            });
 801        }
 802
 803        cx.background_executor().spawn(async move {
 804            let checkpoints = future::try_join_all(checkpoints).await?;
 805            Ok(GitStoreCheckpoint {
 806                checkpoints_by_work_dir_abs_path: work_directory_abs_paths
 807                    .into_iter()
 808                    .zip(checkpoints)
 809                    .collect(),
 810            })
 811        })
 812    }
 813
 814    pub fn restore_checkpoint(
 815        &self,
 816        checkpoint: GitStoreCheckpoint,
 817        cx: &mut App,
 818    ) -> Task<Result<()>> {
 819        let repositories_by_work_dir_abs_path = self
 820            .repositories
 821            .values()
 822            .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
 823            .collect::<HashMap<_, _>>();
 824
 825        let mut tasks = Vec::new();
 826        for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
 827            if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path) {
 828                let restore = repository.update(cx, |repository, _| {
 829                    repository.restore_checkpoint(checkpoint)
 830                });
 831                tasks.push(async move { restore.await? });
 832            }
 833        }
 834        cx.background_spawn(async move {
 835            future::try_join_all(tasks).await?;
 836            Ok(())
 837        })
 838    }
 839
 840    /// Compares two checkpoints, returning true if they are equal.
 841    pub fn compare_checkpoints(
 842        &self,
 843        left: GitStoreCheckpoint,
 844        mut right: GitStoreCheckpoint,
 845        cx: &mut App,
 846    ) -> Task<Result<bool>> {
 847        let repositories_by_work_dir_abs_path = self
 848            .repositories
 849            .values()
 850            .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
 851            .collect::<HashMap<_, _>>();
 852
 853        let mut tasks = Vec::new();
 854        for (work_dir_abs_path, left_checkpoint) in left.checkpoints_by_work_dir_abs_path {
 855            if let Some(right_checkpoint) = right
 856                .checkpoints_by_work_dir_abs_path
 857                .remove(&work_dir_abs_path)
 858            {
 859                if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path)
 860                {
 861                    let compare = repository.update(cx, |repository, _| {
 862                        repository.compare_checkpoints(left_checkpoint, right_checkpoint)
 863                    });
 864
 865                    tasks.push(async move { compare.await? });
 866                }
 867            } else {
 868                return Task::ready(Ok(false));
 869            }
 870        }
 871        cx.background_spawn(async move {
 872            Ok(future::try_join_all(tasks)
 873                .await?
 874                .into_iter()
 875                .all(|result| result))
 876        })
 877    }
 878
 879    /// Blames a buffer.
 880    pub fn blame_buffer(
 881        &self,
 882        buffer: &Entity<Buffer>,
 883        version: Option<clock::Global>,
 884        cx: &mut App,
 885    ) -> Task<Result<Option<Blame>>> {
 886        let buffer = buffer.read(cx);
 887        let Some((repo, repo_path)) =
 888            self.repository_and_path_for_buffer_id(buffer.remote_id(), cx)
 889        else {
 890            return Task::ready(Err(anyhow!("failed to find a git repository for buffer")));
 891        };
 892        let content = match &version {
 893            Some(version) => buffer.rope_for_version(version),
 894            None => buffer.as_rope().clone(),
 895        };
 896        let version = version.unwrap_or(buffer.version());
 897        let buffer_id = buffer.remote_id();
 898
 899        let rx = repo.update(cx, |repo, _| {
 900            repo.send_job(None, move |state, _| async move {
 901                match state {
 902                    RepositoryState::Local { backend, .. } => backend
 903                        .blame(repo_path.clone(), content)
 904                        .await
 905                        .with_context(|| format!("Failed to blame {:?}", repo_path.0))
 906                        .map(Some),
 907                    RepositoryState::Remote { project_id, client } => {
 908                        let response = client
 909                            .request(proto::BlameBuffer {
 910                                project_id: project_id.to_proto(),
 911                                buffer_id: buffer_id.into(),
 912                                version: serialize_version(&version),
 913                            })
 914                            .await?;
 915                        Ok(deserialize_blame_buffer_response(response))
 916                    }
 917                }
 918            })
 919        });
 920
 921        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
 922    }
 923
 924    pub fn get_permalink_to_line(
 925        &self,
 926        buffer: &Entity<Buffer>,
 927        selection: Range<u32>,
 928        cx: &mut App,
 929    ) -> Task<Result<url::Url>> {
 930        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 931            return Task::ready(Err(anyhow!("buffer has no file")));
 932        };
 933
 934        let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
 935            &(file.worktree.read(cx).id(), file.path.clone()).into(),
 936            cx,
 937        ) else {
 938            // If we're not in a Git repo, check whether this is a Rust source
 939            // file in the Cargo registry (presumably opened with go-to-definition
 940            // from a normal Rust file). If so, we can put together a permalink
 941            // using crate metadata.
 942            if buffer
 943                .read(cx)
 944                .language()
 945                .is_none_or(|lang| lang.name() != "Rust".into())
 946            {
 947                return Task::ready(Err(anyhow!("no permalink available")));
 948            }
 949            let Some(file_path) = file.worktree.read(cx).absolutize(&file.path).ok() else {
 950                return Task::ready(Err(anyhow!("no permalink available")));
 951            };
 952            return cx.spawn(async move |cx| {
 953                let provider_registry = cx.update(GitHostingProviderRegistry::default_global)?;
 954                get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
 955                    .context("no permalink available")
 956            });
 957
 958            // TODO remote case
 959        };
 960
 961        let buffer_id = buffer.read(cx).remote_id();
 962        let branch = repo.read(cx).branch.clone();
 963        let remote = branch
 964            .as_ref()
 965            .and_then(|b| b.upstream.as_ref())
 966            .and_then(|b| b.remote_name())
 967            .unwrap_or("origin")
 968            .to_string();
 969
 970        let rx = repo.update(cx, |repo, _| {
 971            repo.send_job(None, move |state, cx| async move {
 972                match state {
 973                    RepositoryState::Local { backend, .. } => {
 974                        let origin_url = backend
 975                            .remote_url(&remote)
 976                            .with_context(|| format!("remote \"{remote}\" not found"))?;
 977
 978                        let sha = backend.head_sha().await.context("reading HEAD SHA")?;
 979
 980                        let provider_registry =
 981                            cx.update(GitHostingProviderRegistry::default_global)?;
 982
 983                        let (provider, remote) =
 984                            parse_git_remote_url(provider_registry, &origin_url)
 985                                .context("parsing Git remote URL")?;
 986
 987                        let path = repo_path.to_str().with_context(|| {
 988                            format!("converting repo path {repo_path:?} to string")
 989                        })?;
 990
 991                        Ok(provider.build_permalink(
 992                            remote,
 993                            BuildPermalinkParams {
 994                                sha: &sha,
 995                                path,
 996                                selection: Some(selection),
 997                            },
 998                        ))
 999                    }
1000                    RepositoryState::Remote { project_id, client } => {
1001                        let response = client
1002                            .request(proto::GetPermalinkToLine {
1003                                project_id: project_id.to_proto(),
1004                                buffer_id: buffer_id.into(),
1005                                selection: Some(proto::Range {
1006                                    start: selection.start as u64,
1007                                    end: selection.end as u64,
1008                                }),
1009                            })
1010                            .await?;
1011
1012                        url::Url::parse(&response.permalink).context("failed to parse permalink")
1013                    }
1014                }
1015            })
1016        });
1017        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1018    }
1019
1020    fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> {
1021        match &self.state {
1022            GitStoreState::Local {
1023                downstream: downstream_client,
1024                ..
1025            } => downstream_client
1026                .as_ref()
1027                .map(|state| (state.client.clone(), state.project_id)),
1028            GitStoreState::Remote {
1029                downstream: downstream_client,
1030                ..
1031            } => downstream_client.clone(),
1032        }
1033    }
1034
1035    fn upstream_client(&self) -> Option<AnyProtoClient> {
1036        match &self.state {
1037            GitStoreState::Local { .. } => None,
1038            GitStoreState::Remote {
1039                upstream_client, ..
1040            } => Some(upstream_client.clone()),
1041        }
1042    }
1043
1044    fn on_worktree_store_event(
1045        &mut self,
1046        worktree_store: Entity<WorktreeStore>,
1047        event: &WorktreeStoreEvent,
1048        cx: &mut Context<Self>,
1049    ) {
1050        let GitStoreState::Local {
1051            project_environment,
1052            downstream,
1053            next_repository_id,
1054            fs,
1055        } = &self.state
1056        else {
1057            return;
1058        };
1059
1060        match event {
1061            WorktreeStoreEvent::WorktreeUpdatedEntries(worktree_id, updated_entries) => {
1062                if let Some(worktree) = self
1063                    .worktree_store
1064                    .read(cx)
1065                    .worktree_for_id(*worktree_id, cx)
1066                {
1067                    let paths_by_git_repo =
1068                        self.process_updated_entries(&worktree, updated_entries, cx);
1069                    let downstream = downstream
1070                        .as_ref()
1071                        .map(|downstream| downstream.updates_tx.clone());
1072                    cx.spawn(async move |_, cx| {
1073                        let paths_by_git_repo = paths_by_git_repo.await;
1074                        for (repo, paths) in paths_by_git_repo {
1075                            repo.update(cx, |repo, cx| {
1076                                repo.paths_changed(paths, downstream.clone(), cx);
1077                            })
1078                            .ok();
1079                        }
1080                    })
1081                    .detach();
1082                }
1083            }
1084            WorktreeStoreEvent::WorktreeUpdatedGitRepositories(worktree_id, changed_repos) => {
1085                let Some(worktree) = worktree_store.read(cx).worktree_for_id(*worktree_id, cx)
1086                else {
1087                    return;
1088                };
1089                if !worktree.read(cx).is_visible() {
1090                    log::debug!(
1091                        "not adding repositories for local worktree {:?} because it's not visible",
1092                        worktree.read(cx).abs_path()
1093                    );
1094                    return;
1095                }
1096                self.update_repositories_from_worktree(
1097                    project_environment.clone(),
1098                    next_repository_id.clone(),
1099                    downstream
1100                        .as_ref()
1101                        .map(|downstream| downstream.updates_tx.clone()),
1102                    changed_repos.clone(),
1103                    fs.clone(),
1104                    cx,
1105                );
1106                self.local_worktree_git_repos_changed(worktree, changed_repos, cx);
1107            }
1108            _ => {}
1109        }
1110    }
1111
1112    fn on_repository_event(
1113        &mut self,
1114        repo: Entity<Repository>,
1115        event: &RepositoryEvent,
1116        cx: &mut Context<Self>,
1117    ) {
1118        let id = repo.read(cx).id;
1119        let repo_snapshot = repo.read(cx).snapshot.clone();
1120        for (buffer_id, diff) in self.diffs.iter() {
1121            if let Some((buffer_repo, repo_path)) =
1122                self.repository_and_path_for_buffer_id(*buffer_id, cx)
1123                && buffer_repo == repo
1124            {
1125                diff.update(cx, |diff, cx| {
1126                    if let Some(conflict_set) = &diff.conflict_set {
1127                        let conflict_status_changed =
1128                            conflict_set.update(cx, |conflict_set, cx| {
1129                                let has_conflict = repo_snapshot.has_conflict(&repo_path);
1130                                conflict_set.set_has_conflict(has_conflict, cx)
1131                            })?;
1132                        if conflict_status_changed {
1133                            let buffer_store = self.buffer_store.read(cx);
1134                            if let Some(buffer) = buffer_store.get(*buffer_id) {
1135                                let _ = diff
1136                                    .reparse_conflict_markers(buffer.read(cx).text_snapshot(), cx);
1137                            }
1138                        }
1139                    }
1140                    anyhow::Ok(())
1141                })
1142                .ok();
1143            }
1144        }
1145        cx.emit(GitStoreEvent::RepositoryUpdated(
1146            id,
1147            event.clone(),
1148            self.active_repo_id == Some(id),
1149        ))
1150    }
1151
1152    fn on_jobs_updated(&mut self, _: Entity<Repository>, _: &JobsUpdated, cx: &mut Context<Self>) {
1153        cx.emit(GitStoreEvent::JobsUpdated)
1154    }
1155
1156    /// Update our list of repositories and schedule git scans in response to a notification from a worktree,
1157    fn update_repositories_from_worktree(
1158        &mut self,
1159        project_environment: Entity<ProjectEnvironment>,
1160        next_repository_id: Arc<AtomicU64>,
1161        updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
1162        updated_git_repositories: UpdatedGitRepositoriesSet,
1163        fs: Arc<dyn Fs>,
1164        cx: &mut Context<Self>,
1165    ) {
1166        let mut removed_ids = Vec::new();
1167        for update in updated_git_repositories.iter() {
1168            if let Some((id, existing)) = self.repositories.iter().find(|(_, repo)| {
1169                let existing_work_directory_abs_path =
1170                    repo.read(cx).work_directory_abs_path.clone();
1171                Some(&existing_work_directory_abs_path)
1172                    == update.old_work_directory_abs_path.as_ref()
1173                    || Some(&existing_work_directory_abs_path)
1174                        == update.new_work_directory_abs_path.as_ref()
1175            }) {
1176                if let Some(new_work_directory_abs_path) =
1177                    update.new_work_directory_abs_path.clone()
1178                {
1179                    existing.update(cx, |existing, cx| {
1180                        existing.snapshot.work_directory_abs_path = new_work_directory_abs_path;
1181                        existing.schedule_scan(updates_tx.clone(), cx);
1182                    });
1183                } else {
1184                    removed_ids.push(*id);
1185                }
1186            } else if let UpdatedGitRepository {
1187                new_work_directory_abs_path: Some(work_directory_abs_path),
1188                dot_git_abs_path: Some(dot_git_abs_path),
1189                repository_dir_abs_path: Some(repository_dir_abs_path),
1190                common_dir_abs_path: Some(common_dir_abs_path),
1191                ..
1192            } = update
1193            {
1194                let id = RepositoryId(next_repository_id.fetch_add(1, atomic::Ordering::Release));
1195                let git_store = cx.weak_entity();
1196                let repo = cx.new(|cx| {
1197                    let mut repo = Repository::local(
1198                        id,
1199                        work_directory_abs_path.clone(),
1200                        dot_git_abs_path.clone(),
1201                        repository_dir_abs_path.clone(),
1202                        common_dir_abs_path.clone(),
1203                        project_environment.downgrade(),
1204                        fs.clone(),
1205                        git_store,
1206                        cx,
1207                    );
1208                    repo.schedule_scan(updates_tx.clone(), cx);
1209                    repo
1210                });
1211                self._subscriptions
1212                    .push(cx.subscribe(&repo, Self::on_repository_event));
1213                self._subscriptions
1214                    .push(cx.subscribe(&repo, Self::on_jobs_updated));
1215                self.repositories.insert(id, repo);
1216                cx.emit(GitStoreEvent::RepositoryAdded(id));
1217                self.active_repo_id.get_or_insert_with(|| {
1218                    cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1219                    id
1220                });
1221            }
1222        }
1223
1224        for id in removed_ids {
1225            if self.active_repo_id == Some(id) {
1226                self.active_repo_id = None;
1227                cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1228            }
1229            self.repositories.remove(&id);
1230            if let Some(updates_tx) = updates_tx.as_ref() {
1231                updates_tx
1232                    .unbounded_send(DownstreamUpdate::RemoveRepository(id))
1233                    .ok();
1234            }
1235        }
1236    }
1237
1238    fn on_buffer_store_event(
1239        &mut self,
1240        _: Entity<BufferStore>,
1241        event: &BufferStoreEvent,
1242        cx: &mut Context<Self>,
1243    ) {
1244        match event {
1245            BufferStoreEvent::BufferAdded(buffer) => {
1246                cx.subscribe(buffer, |this, buffer, event, cx| {
1247                    if let BufferEvent::LanguageChanged = event {
1248                        let buffer_id = buffer.read(cx).remote_id();
1249                        if let Some(diff_state) = this.diffs.get(&buffer_id) {
1250                            diff_state.update(cx, |diff_state, cx| {
1251                                diff_state.buffer_language_changed(buffer, cx);
1252                            });
1253                        }
1254                    }
1255                })
1256                .detach();
1257            }
1258            BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => {
1259                if let Some(diffs) = self.shared_diffs.get_mut(peer_id) {
1260                    diffs.remove(buffer_id);
1261                }
1262            }
1263            BufferStoreEvent::BufferDropped(buffer_id) => {
1264                self.diffs.remove(buffer_id);
1265                for diffs in self.shared_diffs.values_mut() {
1266                    diffs.remove(buffer_id);
1267                }
1268            }
1269
1270            _ => {}
1271        }
1272    }
1273
1274    pub fn recalculate_buffer_diffs(
1275        &mut self,
1276        buffers: Vec<Entity<Buffer>>,
1277        cx: &mut Context<Self>,
1278    ) -> impl Future<Output = ()> + use<> {
1279        let mut futures = Vec::new();
1280        for buffer in buffers {
1281            if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
1282                let buffer = buffer.read(cx).text_snapshot();
1283                diff_state.update(cx, |diff_state, cx| {
1284                    diff_state.recalculate_diffs(buffer.clone(), cx);
1285                    futures.extend(diff_state.wait_for_recalculation().map(FutureExt::boxed));
1286                });
1287                futures.push(diff_state.update(cx, |diff_state, cx| {
1288                    diff_state
1289                        .reparse_conflict_markers(buffer, cx)
1290                        .map(|_| {})
1291                        .boxed()
1292                }));
1293            }
1294        }
1295        async move {
1296            futures::future::join_all(futures).await;
1297        }
1298    }
1299
1300    fn on_buffer_diff_event(
1301        &mut self,
1302        diff: Entity<buffer_diff::BufferDiff>,
1303        event: &BufferDiffEvent,
1304        cx: &mut Context<Self>,
1305    ) {
1306        if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
1307            let buffer_id = diff.read(cx).buffer_id;
1308            if let Some(diff_state) = self.diffs.get(&buffer_id) {
1309                let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
1310                    diff_state.hunk_staging_operation_count += 1;
1311                    diff_state.hunk_staging_operation_count
1312                });
1313                if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
1314                    let recv = repo.update(cx, |repo, cx| {
1315                        log::debug!("hunks changed for {}", path.display());
1316                        repo.spawn_set_index_text_job(
1317                            path,
1318                            new_index_text.as_ref().map(|rope| rope.to_string()),
1319                            Some(hunk_staging_operation_count),
1320                            cx,
1321                        )
1322                    });
1323                    let diff = diff.downgrade();
1324                    cx.spawn(async move |this, cx| {
1325                        if let Ok(Err(error)) = cx.background_spawn(recv).await {
1326                            diff.update(cx, |diff, cx| {
1327                                diff.clear_pending_hunks(cx);
1328                            })
1329                            .ok();
1330                            this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
1331                                .ok();
1332                        }
1333                    })
1334                    .detach();
1335                }
1336            }
1337        }
1338    }
1339
1340    fn local_worktree_git_repos_changed(
1341        &mut self,
1342        worktree: Entity<Worktree>,
1343        changed_repos: &UpdatedGitRepositoriesSet,
1344        cx: &mut Context<Self>,
1345    ) {
1346        log::debug!("local worktree repos changed");
1347        debug_assert!(worktree.read(cx).is_local());
1348
1349        for repository in self.repositories.values() {
1350            repository.update(cx, |repository, cx| {
1351                let repo_abs_path = &repository.work_directory_abs_path;
1352                if changed_repos.iter().any(|update| {
1353                    update.old_work_directory_abs_path.as_ref() == Some(repo_abs_path)
1354                        || update.new_work_directory_abs_path.as_ref() == Some(repo_abs_path)
1355                }) {
1356                    repository.reload_buffer_diff_bases(cx);
1357                }
1358            });
1359        }
1360    }
1361
1362    pub fn repositories(&self) -> &HashMap<RepositoryId, Entity<Repository>> {
1363        &self.repositories
1364    }
1365
1366    pub fn status_for_buffer_id(&self, buffer_id: BufferId, cx: &App) -> Option<FileStatus> {
1367        let (repo, path) = self.repository_and_path_for_buffer_id(buffer_id, cx)?;
1368        let status = repo.read(cx).snapshot.status_for_path(&path)?;
1369        Some(status.status)
1370    }
1371
1372    pub fn repository_and_path_for_buffer_id(
1373        &self,
1374        buffer_id: BufferId,
1375        cx: &App,
1376    ) -> Option<(Entity<Repository>, RepoPath)> {
1377        let buffer = self.buffer_store.read(cx).get(buffer_id)?;
1378        let project_path = buffer.read(cx).project_path(cx)?;
1379        self.repository_and_path_for_project_path(&project_path, cx)
1380    }
1381
1382    pub fn repository_and_path_for_project_path(
1383        &self,
1384        path: &ProjectPath,
1385        cx: &App,
1386    ) -> Option<(Entity<Repository>, RepoPath)> {
1387        let abs_path = self.worktree_store.read(cx).absolutize(path, cx)?;
1388        self.repositories
1389            .values()
1390            .filter_map(|repo| {
1391                let repo_path = repo.read(cx).abs_path_to_repo_path(&abs_path)?;
1392                Some((repo.clone(), repo_path))
1393            })
1394            .max_by_key(|(repo, _)| repo.read(cx).work_directory_abs_path.clone())
1395    }
1396
1397    pub fn git_init(
1398        &self,
1399        path: Arc<Path>,
1400        fallback_branch_name: String,
1401        cx: &App,
1402    ) -> Task<Result<()>> {
1403        match &self.state {
1404            GitStoreState::Local { fs, .. } => {
1405                let fs = fs.clone();
1406                cx.background_executor()
1407                    .spawn(async move { fs.git_init(&path, fallback_branch_name) })
1408            }
1409            GitStoreState::Remote {
1410                upstream_client,
1411                upstream_project_id: project_id,
1412                ..
1413            } => {
1414                let client = upstream_client.clone();
1415                let project_id = *project_id;
1416                cx.background_executor().spawn(async move {
1417                    client
1418                        .request(proto::GitInit {
1419                            project_id: project_id,
1420                            abs_path: path.to_string_lossy().to_string(),
1421                            fallback_branch_name,
1422                        })
1423                        .await?;
1424                    Ok(())
1425                })
1426            }
1427        }
1428    }
1429
1430    pub fn git_clone(
1431        &self,
1432        repo: String,
1433        path: impl Into<Arc<std::path::Path>>,
1434        cx: &App,
1435    ) -> Task<Result<()>> {
1436        let path = path.into();
1437        match &self.state {
1438            GitStoreState::Local { fs, .. } => {
1439                let fs = fs.clone();
1440                cx.background_executor()
1441                    .spawn(async move { fs.git_clone(&repo, &path).await })
1442            }
1443            GitStoreState::Remote {
1444                upstream_client,
1445                upstream_project_id,
1446                ..
1447            } => {
1448                if upstream_client.is_via_collab() {
1449                    return Task::ready(Err(anyhow!(
1450                        "Git Clone isn't supported for project guests"
1451                    )));
1452                }
1453                let request = upstream_client.request(proto::GitClone {
1454                    project_id: *upstream_project_id,
1455                    abs_path: path.to_string_lossy().to_string(),
1456                    remote_repo: repo,
1457                });
1458
1459                cx.background_spawn(async move {
1460                    let result = request.await?;
1461
1462                    match result.success {
1463                        true => Ok(()),
1464                        false => Err(anyhow!("Git Clone failed")),
1465                    }
1466                })
1467            }
1468        }
1469    }
1470
1471    async fn handle_update_repository(
1472        this: Entity<Self>,
1473        envelope: TypedEnvelope<proto::UpdateRepository>,
1474        mut cx: AsyncApp,
1475    ) -> Result<()> {
1476        this.update(&mut cx, |this, cx| {
1477            let mut update = envelope.payload;
1478
1479            let id = RepositoryId::from_proto(update.id);
1480            let client = this.upstream_client().context("no upstream client")?;
1481
1482            let mut is_new = false;
1483            let repo = this.repositories.entry(id).or_insert_with(|| {
1484                is_new = true;
1485                let git_store = cx.weak_entity();
1486                cx.new(|cx| {
1487                    Repository::remote(
1488                        id,
1489                        Path::new(&update.abs_path).into(),
1490                        ProjectId(update.project_id),
1491                        client,
1492                        git_store,
1493                        cx,
1494                    )
1495                })
1496            });
1497            if is_new {
1498                this._subscriptions
1499                    .push(cx.subscribe(repo, Self::on_repository_event))
1500            }
1501
1502            repo.update(cx, {
1503                let update = update.clone();
1504                |repo, cx| repo.apply_remote_update(update, is_new, cx)
1505            })?;
1506
1507            this.active_repo_id.get_or_insert_with(|| {
1508                cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1509                id
1510            });
1511
1512            if let Some((client, project_id)) = this.downstream_client() {
1513                update.project_id = project_id.to_proto();
1514                client.send(update).log_err();
1515            }
1516            Ok(())
1517        })?
1518    }
1519
1520    async fn handle_remove_repository(
1521        this: Entity<Self>,
1522        envelope: TypedEnvelope<proto::RemoveRepository>,
1523        mut cx: AsyncApp,
1524    ) -> Result<()> {
1525        this.update(&mut cx, |this, cx| {
1526            let mut update = envelope.payload;
1527            let id = RepositoryId::from_proto(update.id);
1528            this.repositories.remove(&id);
1529            if let Some((client, project_id)) = this.downstream_client() {
1530                update.project_id = project_id.to_proto();
1531                client.send(update).log_err();
1532            }
1533            if this.active_repo_id == Some(id) {
1534                this.active_repo_id = None;
1535                cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1536            }
1537            cx.emit(GitStoreEvent::RepositoryRemoved(id));
1538        })
1539    }
1540
1541    async fn handle_git_init(
1542        this: Entity<Self>,
1543        envelope: TypedEnvelope<proto::GitInit>,
1544        cx: AsyncApp,
1545    ) -> Result<proto::Ack> {
1546        let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1547        let name = envelope.payload.fallback_branch_name;
1548        cx.update(|cx| this.read(cx).git_init(path, name, cx))?
1549            .await?;
1550
1551        Ok(proto::Ack {})
1552    }
1553
1554    async fn handle_git_clone(
1555        this: Entity<Self>,
1556        envelope: TypedEnvelope<proto::GitClone>,
1557        cx: AsyncApp,
1558    ) -> Result<proto::GitCloneResponse> {
1559        let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1560        let repo_name = envelope.payload.remote_repo;
1561        let result = cx
1562            .update(|cx| this.read(cx).git_clone(repo_name, path, cx))?
1563            .await;
1564
1565        Ok(proto::GitCloneResponse {
1566            success: result.is_ok(),
1567        })
1568    }
1569
1570    async fn handle_fetch(
1571        this: Entity<Self>,
1572        envelope: TypedEnvelope<proto::Fetch>,
1573        mut cx: AsyncApp,
1574    ) -> Result<proto::RemoteMessageResponse> {
1575        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1576        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1577        let fetch_options = FetchOptions::from_proto(envelope.payload.remote);
1578        let askpass_id = envelope.payload.askpass_id;
1579
1580        let askpass = make_remote_delegate(
1581            this,
1582            envelope.payload.project_id,
1583            repository_id,
1584            askpass_id,
1585            &mut cx,
1586        );
1587
1588        let remote_output = repository_handle
1589            .update(&mut cx, |repository_handle, cx| {
1590                repository_handle.fetch(fetch_options, askpass, cx)
1591            })?
1592            .await??;
1593
1594        Ok(proto::RemoteMessageResponse {
1595            stdout: remote_output.stdout,
1596            stderr: remote_output.stderr,
1597        })
1598    }
1599
1600    async fn handle_push(
1601        this: Entity<Self>,
1602        envelope: TypedEnvelope<proto::Push>,
1603        mut cx: AsyncApp,
1604    ) -> Result<proto::RemoteMessageResponse> {
1605        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1606        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1607
1608        let askpass_id = envelope.payload.askpass_id;
1609        let askpass = make_remote_delegate(
1610            this,
1611            envelope.payload.project_id,
1612            repository_id,
1613            askpass_id,
1614            &mut cx,
1615        );
1616
1617        let options = envelope
1618            .payload
1619            .options
1620            .as_ref()
1621            .map(|_| match envelope.payload.options() {
1622                proto::push::PushOptions::SetUpstream => git::repository::PushOptions::SetUpstream,
1623                proto::push::PushOptions::Force => git::repository::PushOptions::Force,
1624            });
1625
1626        let branch_name = envelope.payload.branch_name.into();
1627        let remote_name = envelope.payload.remote_name.into();
1628
1629        let remote_output = repository_handle
1630            .update(&mut cx, |repository_handle, cx| {
1631                repository_handle.push(branch_name, remote_name, options, askpass, cx)
1632            })?
1633            .await??;
1634        Ok(proto::RemoteMessageResponse {
1635            stdout: remote_output.stdout,
1636            stderr: remote_output.stderr,
1637        })
1638    }
1639
1640    async fn handle_pull(
1641        this: Entity<Self>,
1642        envelope: TypedEnvelope<proto::Pull>,
1643        mut cx: AsyncApp,
1644    ) -> Result<proto::RemoteMessageResponse> {
1645        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1646        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1647        let askpass_id = envelope.payload.askpass_id;
1648        let askpass = make_remote_delegate(
1649            this,
1650            envelope.payload.project_id,
1651            repository_id,
1652            askpass_id,
1653            &mut cx,
1654        );
1655
1656        let branch_name = envelope.payload.branch_name.into();
1657        let remote_name = envelope.payload.remote_name.into();
1658
1659        let remote_message = repository_handle
1660            .update(&mut cx, |repository_handle, cx| {
1661                repository_handle.pull(branch_name, remote_name, askpass, cx)
1662            })?
1663            .await??;
1664
1665        Ok(proto::RemoteMessageResponse {
1666            stdout: remote_message.stdout,
1667            stderr: remote_message.stderr,
1668        })
1669    }
1670
1671    async fn handle_stage(
1672        this: Entity<Self>,
1673        envelope: TypedEnvelope<proto::Stage>,
1674        mut cx: AsyncApp,
1675    ) -> Result<proto::Ack> {
1676        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1677        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1678
1679        let entries = envelope
1680            .payload
1681            .paths
1682            .into_iter()
1683            .map(PathBuf::from)
1684            .map(RepoPath::new)
1685            .collect();
1686
1687        repository_handle
1688            .update(&mut cx, |repository_handle, cx| {
1689                repository_handle.stage_entries(entries, cx)
1690            })?
1691            .await?;
1692        Ok(proto::Ack {})
1693    }
1694
1695    async fn handle_unstage(
1696        this: Entity<Self>,
1697        envelope: TypedEnvelope<proto::Unstage>,
1698        mut cx: AsyncApp,
1699    ) -> Result<proto::Ack> {
1700        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1701        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1702
1703        let entries = envelope
1704            .payload
1705            .paths
1706            .into_iter()
1707            .map(PathBuf::from)
1708            .map(RepoPath::new)
1709            .collect();
1710
1711        repository_handle
1712            .update(&mut cx, |repository_handle, cx| {
1713                repository_handle.unstage_entries(entries, cx)
1714            })?
1715            .await?;
1716
1717        Ok(proto::Ack {})
1718    }
1719
1720    async fn handle_stash(
1721        this: Entity<Self>,
1722        envelope: TypedEnvelope<proto::Stash>,
1723        mut cx: AsyncApp,
1724    ) -> Result<proto::Ack> {
1725        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1726        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1727
1728        let entries = envelope
1729            .payload
1730            .paths
1731            .into_iter()
1732            .map(PathBuf::from)
1733            .map(RepoPath::new)
1734            .collect();
1735
1736        repository_handle
1737            .update(&mut cx, |repository_handle, cx| {
1738                repository_handle.stash_entries(entries, cx)
1739            })?
1740            .await?;
1741
1742        Ok(proto::Ack {})
1743    }
1744
1745    async fn handle_stash_pop(
1746        this: Entity<Self>,
1747        envelope: TypedEnvelope<proto::StashPop>,
1748        mut cx: AsyncApp,
1749    ) -> Result<proto::Ack> {
1750        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1751        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1752
1753        repository_handle
1754            .update(&mut cx, |repository_handle, cx| {
1755                repository_handle.stash_pop(cx)
1756            })?
1757            .await?;
1758
1759        Ok(proto::Ack {})
1760    }
1761
1762    async fn handle_set_index_text(
1763        this: Entity<Self>,
1764        envelope: TypedEnvelope<proto::SetIndexText>,
1765        mut cx: AsyncApp,
1766    ) -> Result<proto::Ack> {
1767        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1768        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1769        let repo_path = RepoPath::from_str(&envelope.payload.path);
1770
1771        repository_handle
1772            .update(&mut cx, |repository_handle, cx| {
1773                repository_handle.spawn_set_index_text_job(
1774                    repo_path,
1775                    envelope.payload.text,
1776                    None,
1777                    cx,
1778                )
1779            })?
1780            .await??;
1781        Ok(proto::Ack {})
1782    }
1783
1784    async fn handle_commit(
1785        this: Entity<Self>,
1786        envelope: TypedEnvelope<proto::Commit>,
1787        mut cx: AsyncApp,
1788    ) -> Result<proto::Ack> {
1789        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1790        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1791
1792        let message = SharedString::from(envelope.payload.message);
1793        let name = envelope.payload.name.map(SharedString::from);
1794        let email = envelope.payload.email.map(SharedString::from);
1795        let options = envelope.payload.options.unwrap_or_default();
1796
1797        repository_handle
1798            .update(&mut cx, |repository_handle, cx| {
1799                repository_handle.commit(
1800                    message,
1801                    name.zip(email),
1802                    CommitOptions {
1803                        amend: options.amend,
1804                        signoff: options.signoff,
1805                    },
1806                    cx,
1807                )
1808            })?
1809            .await??;
1810        Ok(proto::Ack {})
1811    }
1812
1813    async fn handle_get_remotes(
1814        this: Entity<Self>,
1815        envelope: TypedEnvelope<proto::GetRemotes>,
1816        mut cx: AsyncApp,
1817    ) -> Result<proto::GetRemotesResponse> {
1818        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1819        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1820
1821        let branch_name = envelope.payload.branch_name;
1822
1823        let remotes = repository_handle
1824            .update(&mut cx, |repository_handle, _| {
1825                repository_handle.get_remotes(branch_name)
1826            })?
1827            .await??;
1828
1829        Ok(proto::GetRemotesResponse {
1830            remotes: remotes
1831                .into_iter()
1832                .map(|remotes| proto::get_remotes_response::Remote {
1833                    name: remotes.name.to_string(),
1834                })
1835                .collect::<Vec<_>>(),
1836        })
1837    }
1838
1839    async fn handle_get_branches(
1840        this: Entity<Self>,
1841        envelope: TypedEnvelope<proto::GitGetBranches>,
1842        mut cx: AsyncApp,
1843    ) -> Result<proto::GitBranchesResponse> {
1844        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1845        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1846
1847        let branches = repository_handle
1848            .update(&mut cx, |repository_handle, _| repository_handle.branches())?
1849            .await??;
1850
1851        Ok(proto::GitBranchesResponse {
1852            branches: branches
1853                .into_iter()
1854                .map(|branch| branch_to_proto(&branch))
1855                .collect::<Vec<_>>(),
1856        })
1857    }
1858    async fn handle_get_default_branch(
1859        this: Entity<Self>,
1860        envelope: TypedEnvelope<proto::GetDefaultBranch>,
1861        mut cx: AsyncApp,
1862    ) -> Result<proto::GetDefaultBranchResponse> {
1863        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1864        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1865
1866        let branch = repository_handle
1867            .update(&mut cx, |repository_handle, _| {
1868                repository_handle.default_branch()
1869            })?
1870            .await??
1871            .map(Into::into);
1872
1873        Ok(proto::GetDefaultBranchResponse { branch })
1874    }
1875    async fn handle_create_branch(
1876        this: Entity<Self>,
1877        envelope: TypedEnvelope<proto::GitCreateBranch>,
1878        mut cx: AsyncApp,
1879    ) -> Result<proto::Ack> {
1880        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1881        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1882        let branch_name = envelope.payload.branch_name;
1883
1884        repository_handle
1885            .update(&mut cx, |repository_handle, _| {
1886                repository_handle.create_branch(branch_name)
1887            })?
1888            .await??;
1889
1890        Ok(proto::Ack {})
1891    }
1892
1893    async fn handle_change_branch(
1894        this: Entity<Self>,
1895        envelope: TypedEnvelope<proto::GitChangeBranch>,
1896        mut cx: AsyncApp,
1897    ) -> Result<proto::Ack> {
1898        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1899        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1900        let branch_name = envelope.payload.branch_name;
1901
1902        repository_handle
1903            .update(&mut cx, |repository_handle, _| {
1904                repository_handle.change_branch(branch_name)
1905            })?
1906            .await??;
1907
1908        Ok(proto::Ack {})
1909    }
1910
1911    async fn handle_show(
1912        this: Entity<Self>,
1913        envelope: TypedEnvelope<proto::GitShow>,
1914        mut cx: AsyncApp,
1915    ) -> Result<proto::GitCommitDetails> {
1916        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1917        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1918
1919        let commit = repository_handle
1920            .update(&mut cx, |repository_handle, _| {
1921                repository_handle.show(envelope.payload.commit)
1922            })?
1923            .await??;
1924        Ok(proto::GitCommitDetails {
1925            sha: commit.sha.into(),
1926            message: commit.message.into(),
1927            commit_timestamp: commit.commit_timestamp,
1928            author_email: commit.author_email.into(),
1929            author_name: commit.author_name.into(),
1930        })
1931    }
1932
1933    async fn handle_load_commit_diff(
1934        this: Entity<Self>,
1935        envelope: TypedEnvelope<proto::LoadCommitDiff>,
1936        mut cx: AsyncApp,
1937    ) -> Result<proto::LoadCommitDiffResponse> {
1938        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1939        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1940
1941        let commit_diff = repository_handle
1942            .update(&mut cx, |repository_handle, _| {
1943                repository_handle.load_commit_diff(envelope.payload.commit)
1944            })?
1945            .await??;
1946        Ok(proto::LoadCommitDiffResponse {
1947            files: commit_diff
1948                .files
1949                .into_iter()
1950                .map(|file| proto::CommitFile {
1951                    path: file.path.to_string(),
1952                    old_text: file.old_text,
1953                    new_text: file.new_text,
1954                })
1955                .collect(),
1956        })
1957    }
1958
1959    async fn handle_reset(
1960        this: Entity<Self>,
1961        envelope: TypedEnvelope<proto::GitReset>,
1962        mut cx: AsyncApp,
1963    ) -> Result<proto::Ack> {
1964        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1965        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1966
1967        let mode = match envelope.payload.mode() {
1968            git_reset::ResetMode::Soft => ResetMode::Soft,
1969            git_reset::ResetMode::Mixed => ResetMode::Mixed,
1970        };
1971
1972        repository_handle
1973            .update(&mut cx, |repository_handle, cx| {
1974                repository_handle.reset(envelope.payload.commit, mode, cx)
1975            })?
1976            .await??;
1977        Ok(proto::Ack {})
1978    }
1979
1980    async fn handle_checkout_files(
1981        this: Entity<Self>,
1982        envelope: TypedEnvelope<proto::GitCheckoutFiles>,
1983        mut cx: AsyncApp,
1984    ) -> Result<proto::Ack> {
1985        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1986        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1987        let paths = envelope
1988            .payload
1989            .paths
1990            .iter()
1991            .map(|s| RepoPath::from_str(s))
1992            .collect();
1993
1994        repository_handle
1995            .update(&mut cx, |repository_handle, cx| {
1996                repository_handle.checkout_files(&envelope.payload.commit, paths, cx)
1997            })?
1998            .await??;
1999        Ok(proto::Ack {})
2000    }
2001
2002    async fn handle_open_commit_message_buffer(
2003        this: Entity<Self>,
2004        envelope: TypedEnvelope<proto::OpenCommitMessageBuffer>,
2005        mut cx: AsyncApp,
2006    ) -> Result<proto::OpenBufferResponse> {
2007        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2008        let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
2009        let buffer = repository
2010            .update(&mut cx, |repository, cx| {
2011                repository.open_commit_buffer(None, this.read(cx).buffer_store.clone(), cx)
2012            })?
2013            .await?;
2014
2015        let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id())?;
2016        this.update(&mut cx, |this, cx| {
2017            this.buffer_store.update(cx, |buffer_store, cx| {
2018                buffer_store
2019                    .create_buffer_for_peer(
2020                        &buffer,
2021                        envelope.original_sender_id.unwrap_or(envelope.sender_id),
2022                        cx,
2023                    )
2024                    .detach_and_log_err(cx);
2025            })
2026        })?;
2027
2028        Ok(proto::OpenBufferResponse {
2029            buffer_id: buffer_id.to_proto(),
2030        })
2031    }
2032
2033    async fn handle_askpass(
2034        this: Entity<Self>,
2035        envelope: TypedEnvelope<proto::AskPassRequest>,
2036        mut cx: AsyncApp,
2037    ) -> Result<proto::AskPassResponse> {
2038        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2039        let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
2040
2041        let delegates = cx.update(|cx| repository.read(cx).askpass_delegates.clone())?;
2042        let Some(mut askpass) = delegates.lock().remove(&envelope.payload.askpass_id) else {
2043            debug_panic!("no askpass found");
2044            anyhow::bail!("no askpass found");
2045        };
2046
2047        let response = askpass.ask_password(envelope.payload.prompt).await?;
2048
2049        delegates
2050            .lock()
2051            .insert(envelope.payload.askpass_id, askpass);
2052
2053        Ok(proto::AskPassResponse { response })
2054    }
2055
2056    async fn handle_check_for_pushed_commits(
2057        this: Entity<Self>,
2058        envelope: TypedEnvelope<proto::CheckForPushedCommits>,
2059        mut cx: AsyncApp,
2060    ) -> Result<proto::CheckForPushedCommitsResponse> {
2061        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2062        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2063
2064        let branches = repository_handle
2065            .update(&mut cx, |repository_handle, _| {
2066                repository_handle.check_for_pushed_commits()
2067            })?
2068            .await??;
2069        Ok(proto::CheckForPushedCommitsResponse {
2070            pushed_to: branches
2071                .into_iter()
2072                .map(|commit| commit.to_string())
2073                .collect(),
2074        })
2075    }
2076
2077    async fn handle_git_diff(
2078        this: Entity<Self>,
2079        envelope: TypedEnvelope<proto::GitDiff>,
2080        mut cx: AsyncApp,
2081    ) -> Result<proto::GitDiffResponse> {
2082        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2083        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2084        let diff_type = match envelope.payload.diff_type() {
2085            proto::git_diff::DiffType::HeadToIndex => DiffType::HeadToIndex,
2086            proto::git_diff::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
2087        };
2088
2089        let mut diff = repository_handle
2090            .update(&mut cx, |repository_handle, cx| {
2091                repository_handle.diff(diff_type, cx)
2092            })?
2093            .await??;
2094        const ONE_MB: usize = 1_000_000;
2095        if diff.len() > ONE_MB {
2096            diff = diff.chars().take(ONE_MB).collect()
2097        }
2098
2099        Ok(proto::GitDiffResponse { diff })
2100    }
2101
2102    async fn handle_open_unstaged_diff(
2103        this: Entity<Self>,
2104        request: TypedEnvelope<proto::OpenUnstagedDiff>,
2105        mut cx: AsyncApp,
2106    ) -> Result<proto::OpenUnstagedDiffResponse> {
2107        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2108        let diff = this
2109            .update(&mut cx, |this, cx| {
2110                let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2111                Some(this.open_unstaged_diff(buffer, cx))
2112            })?
2113            .context("missing buffer")?
2114            .await?;
2115        this.update(&mut cx, |this, _| {
2116            let shared_diffs = this
2117                .shared_diffs
2118                .entry(request.original_sender_id.unwrap_or(request.sender_id))
2119                .or_default();
2120            shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone());
2121        })?;
2122        let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?;
2123        Ok(proto::OpenUnstagedDiffResponse { staged_text })
2124    }
2125
2126    async fn handle_open_uncommitted_diff(
2127        this: Entity<Self>,
2128        request: TypedEnvelope<proto::OpenUncommittedDiff>,
2129        mut cx: AsyncApp,
2130    ) -> Result<proto::OpenUncommittedDiffResponse> {
2131        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2132        let diff = this
2133            .update(&mut cx, |this, cx| {
2134                let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2135                Some(this.open_uncommitted_diff(buffer, cx))
2136            })?
2137            .context("missing buffer")?
2138            .await?;
2139        this.update(&mut cx, |this, _| {
2140            let shared_diffs = this
2141                .shared_diffs
2142                .entry(request.original_sender_id.unwrap_or(request.sender_id))
2143                .or_default();
2144            shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone());
2145        })?;
2146        diff.read_with(&cx, |diff, cx| {
2147            use proto::open_uncommitted_diff_response::Mode;
2148
2149            let unstaged_diff = diff.secondary_diff();
2150            let index_snapshot = unstaged_diff.and_then(|diff| {
2151                let diff = diff.read(cx);
2152                diff.base_text_exists().then(|| diff.base_text())
2153            });
2154
2155            let mode;
2156            let staged_text;
2157            let committed_text;
2158            if diff.base_text_exists() {
2159                let committed_snapshot = diff.base_text();
2160                committed_text = Some(committed_snapshot.text());
2161                if let Some(index_text) = index_snapshot {
2162                    if index_text.remote_id() == committed_snapshot.remote_id() {
2163                        mode = Mode::IndexMatchesHead;
2164                        staged_text = None;
2165                    } else {
2166                        mode = Mode::IndexAndHead;
2167                        staged_text = Some(index_text.text());
2168                    }
2169                } else {
2170                    mode = Mode::IndexAndHead;
2171                    staged_text = None;
2172                }
2173            } else {
2174                mode = Mode::IndexAndHead;
2175                committed_text = None;
2176                staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2177            }
2178
2179            proto::OpenUncommittedDiffResponse {
2180                committed_text,
2181                staged_text,
2182                mode: mode.into(),
2183            }
2184        })
2185    }
2186
2187    async fn handle_update_diff_bases(
2188        this: Entity<Self>,
2189        request: TypedEnvelope<proto::UpdateDiffBases>,
2190        mut cx: AsyncApp,
2191    ) -> Result<()> {
2192        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2193        this.update(&mut cx, |this, cx| {
2194            if let Some(diff_state) = this.diffs.get_mut(&buffer_id)
2195                && let Some(buffer) = this.buffer_store.read(cx).get(buffer_id)
2196            {
2197                let buffer = buffer.read(cx).text_snapshot();
2198                diff_state.update(cx, |diff_state, cx| {
2199                    diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2200                })
2201            }
2202        })
2203    }
2204
2205    async fn handle_blame_buffer(
2206        this: Entity<Self>,
2207        envelope: TypedEnvelope<proto::BlameBuffer>,
2208        mut cx: AsyncApp,
2209    ) -> Result<proto::BlameBufferResponse> {
2210        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2211        let version = deserialize_version(&envelope.payload.version);
2212        let buffer = this.read_with(&cx, |this, cx| {
2213            this.buffer_store.read(cx).get_existing(buffer_id)
2214        })??;
2215        buffer
2216            .update(&mut cx, |buffer, _| {
2217                buffer.wait_for_version(version.clone())
2218            })?
2219            .await?;
2220        let blame = this
2221            .update(&mut cx, |this, cx| {
2222                this.blame_buffer(&buffer, Some(version), cx)
2223            })?
2224            .await?;
2225        Ok(serialize_blame_buffer_response(blame))
2226    }
2227
2228    async fn handle_get_permalink_to_line(
2229        this: Entity<Self>,
2230        envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2231        mut cx: AsyncApp,
2232    ) -> Result<proto::GetPermalinkToLineResponse> {
2233        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2234        // let version = deserialize_version(&envelope.payload.version);
2235        let selection = {
2236            let proto_selection = envelope
2237                .payload
2238                .selection
2239                .context("no selection to get permalink for defined")?;
2240            proto_selection.start as u32..proto_selection.end as u32
2241        };
2242        let buffer = this.read_with(&cx, |this, cx| {
2243            this.buffer_store.read(cx).get_existing(buffer_id)
2244        })??;
2245        let permalink = this
2246            .update(&mut cx, |this, cx| {
2247                this.get_permalink_to_line(&buffer, selection, cx)
2248            })?
2249            .await?;
2250        Ok(proto::GetPermalinkToLineResponse {
2251            permalink: permalink.to_string(),
2252        })
2253    }
2254
2255    fn repository_for_request(
2256        this: &Entity<Self>,
2257        id: RepositoryId,
2258        cx: &mut AsyncApp,
2259    ) -> Result<Entity<Repository>> {
2260        this.read_with(cx, |this, _| {
2261            this.repositories
2262                .get(&id)
2263                .context("missing repository handle")
2264                .cloned()
2265        })?
2266    }
2267
2268    pub fn repo_snapshots(&self, cx: &App) -> HashMap<RepositoryId, RepositorySnapshot> {
2269        self.repositories
2270            .iter()
2271            .map(|(id, repo)| (*id, repo.read(cx).snapshot.clone()))
2272            .collect()
2273    }
2274
2275    fn process_updated_entries(
2276        &self,
2277        worktree: &Entity<Worktree>,
2278        updated_entries: &[(Arc<Path>, ProjectEntryId, PathChange)],
2279        cx: &mut App,
2280    ) -> Task<HashMap<Entity<Repository>, Vec<RepoPath>>> {
2281        let mut repo_paths = self
2282            .repositories
2283            .values()
2284            .map(|repo| (repo.read(cx).work_directory_abs_path.clone(), repo.clone()))
2285            .collect::<Vec<_>>();
2286        let mut entries: Vec<_> = updated_entries
2287            .iter()
2288            .map(|(path, _, _)| path.clone())
2289            .collect();
2290        entries.sort();
2291        let worktree = worktree.read(cx);
2292
2293        let entries = entries
2294            .into_iter()
2295            .filter_map(|path| worktree.absolutize(&path).ok())
2296            .collect::<Arc<[_]>>();
2297
2298        let executor = cx.background_executor().clone();
2299        cx.background_executor().spawn(async move {
2300            repo_paths.sort_by(|lhs, rhs| lhs.0.cmp(&rhs.0));
2301            let mut paths_by_git_repo = HashMap::<_, Vec<_>>::default();
2302            let mut tasks = FuturesOrdered::new();
2303            for (repo_path, repo) in repo_paths.into_iter().rev() {
2304                let entries = entries.clone();
2305                let task = executor.spawn(async move {
2306                    // Find all repository paths that belong to this repo
2307                    let mut ix = entries.partition_point(|path| path < &*repo_path);
2308                    if ix == entries.len() {
2309                        return None;
2310                    };
2311
2312                    let mut paths = Vec::new();
2313                    // All paths prefixed by a given repo will constitute a continuous range.
2314                    while let Some(path) = entries.get(ix)
2315                        && let Some(repo_path) =
2316                            RepositorySnapshot::abs_path_to_repo_path_inner(&repo_path, path)
2317                    {
2318                        paths.push((repo_path, ix));
2319                        ix += 1;
2320                    }
2321                    if paths.is_empty() {
2322                        None
2323                    } else {
2324                        Some((repo, paths))
2325                    }
2326                });
2327                tasks.push_back(task);
2328            }
2329
2330            // Now, let's filter out the "duplicate" entries that were processed by multiple distinct repos.
2331            let mut path_was_used = vec![false; entries.len()];
2332            let tasks = tasks.collect::<Vec<_>>().await;
2333            // Process tasks from the back: iterating backwards allows us to see more-specific paths first.
2334            // We always want to assign a path to it's innermost repository.
2335            for t in tasks {
2336                let Some((repo, paths)) = t else {
2337                    continue;
2338                };
2339                let entry = paths_by_git_repo.entry(repo).or_default();
2340                for (repo_path, ix) in paths {
2341                    if path_was_used[ix] {
2342                        continue;
2343                    }
2344                    path_was_used[ix] = true;
2345                    entry.push(repo_path);
2346                }
2347            }
2348
2349            paths_by_git_repo
2350        })
2351    }
2352}
2353
2354impl BufferGitState {
2355    fn new() -> Self {
2356        Self {
2357            unstaged_diff: Default::default(),
2358            uncommitted_diff: Default::default(),
2359            branch_diff: Default::default(),
2360            recalculate_diff_task: Default::default(),
2361            language: Default::default(),
2362            language_registry: Default::default(),
2363            recalculating_tx: postage::watch::channel_with(false).0,
2364            hunk_staging_operation_count: 0,
2365            hunk_staging_operation_count_as_of_write: 0,
2366            head_text: Default::default(),
2367            index_text: Default::default(),
2368            head_changed: Default::default(),
2369            index_changed: Default::default(),
2370            language_changed: Default::default(),
2371            conflict_updated_futures: Default::default(),
2372            conflict_set: Default::default(),
2373            reparse_conflict_markers_task: Default::default(),
2374        }
2375    }
2376
2377    fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
2378        self.language = buffer.read(cx).language().cloned();
2379        self.language_changed = true;
2380        let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
2381    }
2382
2383    fn reparse_conflict_markers(
2384        &mut self,
2385        buffer: text::BufferSnapshot,
2386        cx: &mut Context<Self>,
2387    ) -> oneshot::Receiver<()> {
2388        let (tx, rx) = oneshot::channel();
2389
2390        let Some(conflict_set) = self
2391            .conflict_set
2392            .as_ref()
2393            .and_then(|conflict_set| conflict_set.upgrade())
2394        else {
2395            return rx;
2396        };
2397
2398        let old_snapshot = conflict_set.read_with(cx, |conflict_set, _| {
2399            if conflict_set.has_conflict {
2400                Some(conflict_set.snapshot())
2401            } else {
2402                None
2403            }
2404        });
2405
2406        if let Some(old_snapshot) = old_snapshot {
2407            self.conflict_updated_futures.push(tx);
2408            self.reparse_conflict_markers_task = Some(cx.spawn(async move |this, cx| {
2409                let (snapshot, changed_range) = cx
2410                    .background_spawn(async move {
2411                        let new_snapshot = ConflictSet::parse(&buffer);
2412                        let changed_range = old_snapshot.compare(&new_snapshot, &buffer);
2413                        (new_snapshot, changed_range)
2414                    })
2415                    .await;
2416                this.update(cx, |this, cx| {
2417                    if let Some(conflict_set) = &this.conflict_set {
2418                        conflict_set
2419                            .update(cx, |conflict_set, cx| {
2420                                conflict_set.set_snapshot(snapshot, changed_range, cx);
2421                            })
2422                            .ok();
2423                    }
2424                    let futures = std::mem::take(&mut this.conflict_updated_futures);
2425                    for tx in futures {
2426                        tx.send(()).ok();
2427                    }
2428                })
2429            }))
2430        }
2431
2432        rx
2433    }
2434
2435    fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
2436        self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
2437    }
2438
2439    fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
2440        self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
2441    }
2442
2443    fn handle_base_texts_updated(
2444        &mut self,
2445        buffer: text::BufferSnapshot,
2446        message: proto::UpdateDiffBases,
2447        cx: &mut Context<Self>,
2448    ) {
2449        use proto::update_diff_bases::Mode;
2450
2451        let Some(mode) = Mode::from_i32(message.mode) else {
2452            return;
2453        };
2454
2455        let diff_bases_change = match mode {
2456            Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
2457            Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
2458            Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
2459            Mode::IndexAndHead => DiffBasesChange::SetEach {
2460                index: message.staged_text,
2461                head: message.committed_text,
2462            },
2463        };
2464
2465        self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
2466    }
2467
2468    pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
2469        if *self.recalculating_tx.borrow() {
2470            let mut rx = self.recalculating_tx.subscribe();
2471            Some(async move {
2472                loop {
2473                    let is_recalculating = rx.recv().await;
2474                    if is_recalculating != Some(true) {
2475                        break;
2476                    }
2477                }
2478            })
2479        } else {
2480            None
2481        }
2482    }
2483
2484    fn diff_bases_changed(
2485        &mut self,
2486        buffer: text::BufferSnapshot,
2487        diff_bases_change: Option<DiffBasesChange>,
2488        cx: &mut Context<Self>,
2489    ) {
2490        match diff_bases_change {
2491            Some(DiffBasesChange::SetIndex(index)) => {
2492                self.index_text = index.map(|mut index| {
2493                    text::LineEnding::normalize(&mut index);
2494                    Arc::new(index)
2495                });
2496                self.index_changed = true;
2497            }
2498            Some(DiffBasesChange::SetHead(head)) => {
2499                self.head_text = head.map(|mut head| {
2500                    text::LineEnding::normalize(&mut head);
2501                    Arc::new(head)
2502                });
2503                self.head_changed = true;
2504            }
2505            Some(DiffBasesChange::SetBoth(text)) => {
2506                let text = text.map(|mut text| {
2507                    text::LineEnding::normalize(&mut text);
2508                    Arc::new(text)
2509                });
2510                self.head_text = text.clone();
2511                self.index_text = text;
2512                self.head_changed = true;
2513                self.index_changed = true;
2514            }
2515            Some(DiffBasesChange::SetEach { index, head }) => {
2516                self.index_text = index.map(|mut index| {
2517                    text::LineEnding::normalize(&mut index);
2518                    Arc::new(index)
2519                });
2520                self.index_changed = true;
2521                self.head_text = head.map(|mut head| {
2522                    text::LineEnding::normalize(&mut head);
2523                    Arc::new(head)
2524                });
2525                self.head_changed = true;
2526            }
2527            None => {}
2528        }
2529
2530        self.recalculate_diffs(buffer, cx)
2531    }
2532
2533    fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
2534        *self.recalculating_tx.borrow_mut() = true;
2535
2536        let language = self.language.clone();
2537        let language_registry = self.language_registry.clone();
2538        let unstaged_diff = self.unstaged_diff();
2539        let uncommitted_diff = self.uncommitted_diff();
2540        let head = self.head_text.clone();
2541        let index = self.index_text.clone();
2542        let index_changed = self.index_changed;
2543        let head_changed = self.head_changed;
2544        let language_changed = self.language_changed;
2545        let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_write;
2546        let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
2547            (Some(index), Some(head)) => Arc::ptr_eq(index, head),
2548            (None, None) => true,
2549            _ => false,
2550        };
2551        self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
2552            log::debug!(
2553                "start recalculating diffs for buffer {}",
2554                buffer.remote_id()
2555            );
2556
2557            let mut new_unstaged_diff = None;
2558            if let Some(unstaged_diff) = &unstaged_diff {
2559                new_unstaged_diff = Some(
2560                    BufferDiff::update_diff(
2561                        unstaged_diff.clone(),
2562                        buffer.clone(),
2563                        index,
2564                        index_changed,
2565                        language_changed,
2566                        language.clone(),
2567                        language_registry.clone(),
2568                        cx,
2569                    )
2570                    .await?,
2571                );
2572            }
2573
2574            let mut new_uncommitted_diff = None;
2575            if let Some(uncommitted_diff) = &uncommitted_diff {
2576                new_uncommitted_diff = if index_matches_head {
2577                    new_unstaged_diff.clone()
2578                } else {
2579                    Some(
2580                        BufferDiff::update_diff(
2581                            uncommitted_diff.clone(),
2582                            buffer.clone(),
2583                            head,
2584                            head_changed,
2585                            language_changed,
2586                            language.clone(),
2587                            language_registry.clone(),
2588                            cx,
2589                        )
2590                        .await?,
2591                    )
2592                }
2593            }
2594
2595            let cancel = this.update(cx, |this, _| {
2596                // This checks whether all pending stage/unstage operations
2597                // have quiesced (i.e. both the corresponding write and the
2598                // read of that write have completed). If not, then we cancel
2599                // this recalculation attempt to avoid invalidating pending
2600                // state too quickly; another recalculation will come along
2601                // later and clear the pending state once the state of the index has settled.
2602                if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
2603                    *this.recalculating_tx.borrow_mut() = false;
2604                    true
2605                } else {
2606                    false
2607                }
2608            })?;
2609            if cancel {
2610                log::debug!(
2611                    concat!(
2612                        "aborting recalculating diffs for buffer {}",
2613                        "due to subsequent hunk operations",
2614                    ),
2615                    buffer.remote_id()
2616                );
2617                return Ok(());
2618            }
2619
2620            let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
2621                unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
2622            {
2623                unstaged_diff.update(cx, |diff, cx| {
2624                    if language_changed {
2625                        diff.language_changed(cx);
2626                    }
2627                    diff.set_snapshot(new_unstaged_diff, &buffer, cx)
2628                })?
2629            } else {
2630                None
2631            };
2632
2633            if let Some((uncommitted_diff, new_uncommitted_diff)) =
2634                uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
2635            {
2636                uncommitted_diff.update(cx, |diff, cx| {
2637                    if language_changed {
2638                        diff.language_changed(cx);
2639                    }
2640                    diff.set_snapshot_with_secondary(
2641                        new_uncommitted_diff,
2642                        &buffer,
2643                        unstaged_changed_range,
2644                        true,
2645                        cx,
2646                    );
2647                })?;
2648            }
2649
2650            log::debug!(
2651                "finished recalculating diffs for buffer {}",
2652                buffer.remote_id()
2653            );
2654
2655            if let Some(this) = this.upgrade() {
2656                this.update(cx, |this, _| {
2657                    this.index_changed = false;
2658                    this.head_changed = false;
2659                    this.language_changed = false;
2660                    *this.recalculating_tx.borrow_mut() = false;
2661                })?;
2662            }
2663
2664            Ok(())
2665        }));
2666    }
2667}
2668
2669fn make_remote_delegate(
2670    this: Entity<GitStore>,
2671    project_id: u64,
2672    repository_id: RepositoryId,
2673    askpass_id: u64,
2674    cx: &mut AsyncApp,
2675) -> AskPassDelegate {
2676    AskPassDelegate::new(cx, move |prompt, tx, cx| {
2677        this.update(cx, |this, cx| {
2678            let Some((client, _)) = this.downstream_client() else {
2679                return;
2680            };
2681            let response = client.request(proto::AskPassRequest {
2682                project_id,
2683                repository_id: repository_id.to_proto(),
2684                askpass_id,
2685                prompt,
2686            });
2687            cx.spawn(async move |_, _| {
2688                tx.send(response.await?.response).ok();
2689                anyhow::Ok(())
2690            })
2691            .detach_and_log_err(cx);
2692        })
2693        .log_err();
2694    })
2695}
2696
2697impl RepositoryId {
2698    pub fn to_proto(self) -> u64 {
2699        self.0
2700    }
2701
2702    pub fn from_proto(id: u64) -> Self {
2703        RepositoryId(id)
2704    }
2705}
2706
2707impl RepositorySnapshot {
2708    fn empty(id: RepositoryId, work_directory_abs_path: Arc<Path>) -> Self {
2709        Self {
2710            id,
2711            statuses_by_path: Default::default(),
2712            work_directory_abs_path,
2713            branch: None,
2714            head_commit: None,
2715            scan_id: 0,
2716            merge: Default::default(),
2717            remote_origin_url: None,
2718            remote_upstream_url: None,
2719        }
2720    }
2721
2722    fn initial_update(&self, project_id: u64) -> proto::UpdateRepository {
2723        proto::UpdateRepository {
2724            branch_summary: self.branch.as_ref().map(branch_to_proto),
2725            head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2726            updated_statuses: self
2727                .statuses_by_path
2728                .iter()
2729                .map(|entry| entry.to_proto())
2730                .collect(),
2731            removed_statuses: Default::default(),
2732            current_merge_conflicts: self
2733                .merge
2734                .conflicted_paths
2735                .iter()
2736                .map(|repo_path| repo_path.to_proto())
2737                .collect(),
2738            merge_message: self.merge.message.as_ref().map(|msg| msg.to_string()),
2739            project_id,
2740            id: self.id.to_proto(),
2741            abs_path: self.work_directory_abs_path.to_proto(),
2742            entry_ids: vec![self.id.to_proto()],
2743            scan_id: self.scan_id,
2744            is_last_update: true,
2745        }
2746    }
2747
2748    fn build_update(&self, old: &Self, project_id: u64) -> proto::UpdateRepository {
2749        let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
2750        let mut removed_statuses: Vec<String> = Vec::new();
2751
2752        let mut new_statuses = self.statuses_by_path.iter().peekable();
2753        let mut old_statuses = old.statuses_by_path.iter().peekable();
2754
2755        let mut current_new_entry = new_statuses.next();
2756        let mut current_old_entry = old_statuses.next();
2757        loop {
2758            match (current_new_entry, current_old_entry) {
2759                (Some(new_entry), Some(old_entry)) => {
2760                    match new_entry.repo_path.cmp(&old_entry.repo_path) {
2761                        Ordering::Less => {
2762                            updated_statuses.push(new_entry.to_proto());
2763                            current_new_entry = new_statuses.next();
2764                        }
2765                        Ordering::Equal => {
2766                            if new_entry.status != old_entry.status {
2767                                updated_statuses.push(new_entry.to_proto());
2768                            }
2769                            current_old_entry = old_statuses.next();
2770                            current_new_entry = new_statuses.next();
2771                        }
2772                        Ordering::Greater => {
2773                            removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2774                            current_old_entry = old_statuses.next();
2775                        }
2776                    }
2777                }
2778                (None, Some(old_entry)) => {
2779                    removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2780                    current_old_entry = old_statuses.next();
2781                }
2782                (Some(new_entry), None) => {
2783                    updated_statuses.push(new_entry.to_proto());
2784                    current_new_entry = new_statuses.next();
2785                }
2786                (None, None) => break,
2787            }
2788        }
2789
2790        proto::UpdateRepository {
2791            branch_summary: self.branch.as_ref().map(branch_to_proto),
2792            head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2793            updated_statuses,
2794            removed_statuses,
2795            current_merge_conflicts: self
2796                .merge
2797                .conflicted_paths
2798                .iter()
2799                .map(|path| path.as_ref().to_proto())
2800                .collect(),
2801            merge_message: self.merge.message.as_ref().map(|msg| msg.to_string()),
2802            project_id,
2803            id: self.id.to_proto(),
2804            abs_path: self.work_directory_abs_path.to_proto(),
2805            entry_ids: vec![],
2806            scan_id: self.scan_id,
2807            is_last_update: true,
2808        }
2809    }
2810
2811    pub fn status(&self) -> impl Iterator<Item = StatusEntry> + '_ {
2812        self.statuses_by_path.iter().cloned()
2813    }
2814
2815    pub fn status_summary(&self) -> GitSummary {
2816        self.statuses_by_path.summary().item_summary
2817    }
2818
2819    pub fn status_for_path(&self, path: &RepoPath) -> Option<StatusEntry> {
2820        self.statuses_by_path
2821            .get(&PathKey(path.0.clone()), &())
2822            .cloned()
2823    }
2824
2825    pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
2826        Self::abs_path_to_repo_path_inner(&self.work_directory_abs_path, abs_path)
2827    }
2828
2829    #[inline]
2830    fn abs_path_to_repo_path_inner(
2831        work_directory_abs_path: &Path,
2832        abs_path: &Path,
2833    ) -> Option<RepoPath> {
2834        abs_path
2835            .strip_prefix(&work_directory_abs_path)
2836            .map(RepoPath::from)
2837            .ok()
2838    }
2839
2840    pub fn had_conflict_on_last_merge_head_change(&self, repo_path: &RepoPath) -> bool {
2841        self.merge.conflicted_paths.contains(repo_path)
2842    }
2843
2844    pub fn has_conflict(&self, repo_path: &RepoPath) -> bool {
2845        let had_conflict_on_last_merge_head_change =
2846            self.merge.conflicted_paths.contains(repo_path);
2847        let has_conflict_currently = self
2848            .status_for_path(repo_path)
2849            .is_some_and(|entry| entry.status.is_conflicted());
2850        had_conflict_on_last_merge_head_change || has_conflict_currently
2851    }
2852
2853    /// This is the name that will be displayed in the repository selector for this repository.
2854    pub fn display_name(&self) -> SharedString {
2855        self.work_directory_abs_path
2856            .file_name()
2857            .unwrap_or_default()
2858            .to_string_lossy()
2859            .to_string()
2860            .into()
2861    }
2862}
2863
2864impl MergeDetails {
2865    async fn load(
2866        backend: &Arc<dyn GitRepository>,
2867        status: &SumTree<StatusEntry>,
2868        prev_snapshot: &RepositorySnapshot,
2869    ) -> Result<(MergeDetails, bool)> {
2870        log::debug!("load merge details");
2871        let message = backend.merge_message().await;
2872        let heads = backend
2873            .revparse_batch(vec![
2874                "MERGE_HEAD".into(),
2875                "CHERRY_PICK_HEAD".into(),
2876                "REBASE_HEAD".into(),
2877                "REVERT_HEAD".into(),
2878                "APPLY_HEAD".into(),
2879            ])
2880            .await
2881            .log_err()
2882            .unwrap_or_default()
2883            .into_iter()
2884            .map(|opt| opt.map(SharedString::from))
2885            .collect::<Vec<_>>();
2886        let merge_heads_changed = heads != prev_snapshot.merge.heads;
2887        let conflicted_paths = if merge_heads_changed {
2888            let current_conflicted_paths = TreeSet::from_ordered_entries(
2889                status
2890                    .iter()
2891                    .filter(|entry| entry.status.is_conflicted())
2892                    .map(|entry| entry.repo_path.clone()),
2893            );
2894
2895            // It can happen that we run a scan while a lengthy merge is in progress
2896            // that will eventually result in conflicts, but before those conflicts
2897            // are reported by `git status`. Since for the moment we only care about
2898            // the merge heads state for the purposes of tracking conflicts, don't update
2899            // this state until we see some conflicts.
2900            if heads.iter().any(Option::is_some)
2901                && !prev_snapshot.merge.heads.iter().any(Option::is_some)
2902                && current_conflicted_paths.is_empty()
2903            {
2904                log::debug!("not updating merge heads because no conflicts found");
2905                return Ok((
2906                    MergeDetails {
2907                        message: message.map(SharedString::from),
2908                        ..prev_snapshot.merge.clone()
2909                    },
2910                    false,
2911                ));
2912            }
2913
2914            current_conflicted_paths
2915        } else {
2916            prev_snapshot.merge.conflicted_paths.clone()
2917        };
2918        let details = MergeDetails {
2919            conflicted_paths,
2920            message: message.map(SharedString::from),
2921            heads,
2922        };
2923        Ok((details, merge_heads_changed))
2924    }
2925}
2926
2927impl Repository {
2928    pub fn snapshot(&self) -> RepositorySnapshot {
2929        self.snapshot.clone()
2930    }
2931
2932    fn local(
2933        id: RepositoryId,
2934        work_directory_abs_path: Arc<Path>,
2935        dot_git_abs_path: Arc<Path>,
2936        repository_dir_abs_path: Arc<Path>,
2937        common_dir_abs_path: Arc<Path>,
2938        project_environment: WeakEntity<ProjectEnvironment>,
2939        fs: Arc<dyn Fs>,
2940        git_store: WeakEntity<GitStore>,
2941        cx: &mut Context<Self>,
2942    ) -> Self {
2943        let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path.clone());
2944        Repository {
2945            this: cx.weak_entity(),
2946            git_store,
2947            snapshot,
2948            commit_message_buffer: None,
2949            askpass_delegates: Default::default(),
2950            paths_needing_status_update: Default::default(),
2951            latest_askpass_id: 0,
2952            job_sender: Repository::spawn_local_git_worker(
2953                work_directory_abs_path,
2954                dot_git_abs_path,
2955                repository_dir_abs_path,
2956                common_dir_abs_path,
2957                project_environment,
2958                fs,
2959                cx,
2960            ),
2961            job_id: 0,
2962            active_jobs: Default::default(),
2963        }
2964    }
2965
2966    fn remote(
2967        id: RepositoryId,
2968        work_directory_abs_path: Arc<Path>,
2969        project_id: ProjectId,
2970        client: AnyProtoClient,
2971        git_store: WeakEntity<GitStore>,
2972        cx: &mut Context<Self>,
2973    ) -> Self {
2974        let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path);
2975        Self {
2976            this: cx.weak_entity(),
2977            snapshot,
2978            commit_message_buffer: None,
2979            git_store,
2980            paths_needing_status_update: Default::default(),
2981            job_sender: Self::spawn_remote_git_worker(project_id, client, cx),
2982            askpass_delegates: Default::default(),
2983            latest_askpass_id: 0,
2984            active_jobs: Default::default(),
2985            job_id: 0,
2986        }
2987    }
2988
2989    pub fn git_store(&self) -> Option<Entity<GitStore>> {
2990        self.git_store.upgrade()
2991    }
2992
2993    fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
2994        let this = cx.weak_entity();
2995        let git_store = self.git_store.clone();
2996        let _ = self.send_keyed_job(
2997            Some(GitJobKey::ReloadBufferDiffBases),
2998            None,
2999            |state, mut cx| async move {
3000                let RepositoryState::Local { backend, .. } = state else {
3001                    log::error!("tried to recompute diffs for a non-local repository");
3002                    return Ok(());
3003                };
3004
3005                let Some(this) = this.upgrade() else {
3006                    return Ok(());
3007                };
3008
3009                let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
3010                    git_store.update(cx, |git_store, cx| {
3011                        git_store
3012                            .diffs
3013                            .iter()
3014                            .filter_map(|(buffer_id, diff_state)| {
3015                                let buffer_store = git_store.buffer_store.read(cx);
3016                                let buffer = buffer_store.get(*buffer_id)?;
3017                                let file = File::from_dyn(buffer.read(cx).file())?;
3018                                let abs_path =
3019                                    file.worktree.read(cx).absolutize(&file.path).ok()?;
3020                                let repo_path = this.abs_path_to_repo_path(&abs_path)?;
3021                                log::debug!(
3022                                    "start reload diff bases for repo path {}",
3023                                    repo_path.0.display()
3024                                );
3025                                diff_state.update(cx, |diff_state, _| {
3026                                    let has_unstaged_diff = diff_state
3027                                        .unstaged_diff
3028                                        .as_ref()
3029                                        .is_some_and(|diff| diff.is_upgradable());
3030                                    let has_uncommitted_diff = diff_state
3031                                        .uncommitted_diff
3032                                        .as_ref()
3033                                        .is_some_and(|set| set.is_upgradable());
3034
3035                                    Some((
3036                                        buffer,
3037                                        repo_path,
3038                                        has_unstaged_diff.then(|| diff_state.index_text.clone()),
3039                                        has_uncommitted_diff.then(|| diff_state.head_text.clone()),
3040                                    ))
3041                                })
3042                            })
3043                            .collect::<Vec<_>>()
3044                    })
3045                })??;
3046
3047                let buffer_diff_base_changes = cx
3048                    .background_spawn(async move {
3049                        let mut changes = Vec::new();
3050                        for (buffer, repo_path, current_index_text, current_head_text) in
3051                            &repo_diff_state_updates
3052                        {
3053                            let index_text = if current_index_text.is_some() {
3054                                backend.load_index_text(repo_path.clone()).await
3055                            } else {
3056                                None
3057                            };
3058                            let head_text = if current_head_text.is_some() {
3059                                backend.load_committed_text(repo_path.clone()).await
3060                            } else {
3061                                None
3062                            };
3063
3064                            let change =
3065                                match (current_index_text.as_ref(), current_head_text.as_ref()) {
3066                                    (Some(current_index), Some(current_head)) => {
3067                                        let index_changed =
3068                                            index_text.as_ref() != current_index.as_deref();
3069                                        let head_changed =
3070                                            head_text.as_ref() != current_head.as_deref();
3071                                        if index_changed && head_changed {
3072                                            if index_text == head_text {
3073                                                Some(DiffBasesChange::SetBoth(head_text))
3074                                            } else {
3075                                                Some(DiffBasesChange::SetEach {
3076                                                    index: index_text,
3077                                                    head: head_text,
3078                                                })
3079                                            }
3080                                        } else if index_changed {
3081                                            Some(DiffBasesChange::SetIndex(index_text))
3082                                        } else if head_changed {
3083                                            Some(DiffBasesChange::SetHead(head_text))
3084                                        } else {
3085                                            None
3086                                        }
3087                                    }
3088                                    (Some(current_index), None) => {
3089                                        let index_changed =
3090                                            index_text.as_ref() != current_index.as_deref();
3091                                        index_changed
3092                                            .then_some(DiffBasesChange::SetIndex(index_text))
3093                                    }
3094                                    (None, Some(current_head)) => {
3095                                        let head_changed =
3096                                            head_text.as_ref() != current_head.as_deref();
3097                                        head_changed.then_some(DiffBasesChange::SetHead(head_text))
3098                                    }
3099                                    (None, None) => None,
3100                                };
3101
3102                            changes.push((buffer.clone(), change))
3103                        }
3104                        changes
3105                    })
3106                    .await;
3107
3108                git_store.update(&mut cx, |git_store, cx| {
3109                    for (buffer, diff_bases_change) in buffer_diff_base_changes {
3110                        let buffer_snapshot = buffer.read(cx).text_snapshot();
3111                        let buffer_id = buffer_snapshot.remote_id();
3112                        let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
3113                            continue;
3114                        };
3115
3116                        let downstream_client = git_store.downstream_client();
3117                        diff_state.update(cx, |diff_state, cx| {
3118                            use proto::update_diff_bases::Mode;
3119
3120                            if let Some((diff_bases_change, (client, project_id))) =
3121                                diff_bases_change.clone().zip(downstream_client)
3122                            {
3123                                let (staged_text, committed_text, mode) = match diff_bases_change {
3124                                    DiffBasesChange::SetIndex(index) => {
3125                                        (index, None, Mode::IndexOnly)
3126                                    }
3127                                    DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
3128                                    DiffBasesChange::SetEach { index, head } => {
3129                                        (index, head, Mode::IndexAndHead)
3130                                    }
3131                                    DiffBasesChange::SetBoth(text) => {
3132                                        (None, text, Mode::IndexMatchesHead)
3133                                    }
3134                                };
3135                                client
3136                                    .send(proto::UpdateDiffBases {
3137                                        project_id: project_id.to_proto(),
3138                                        buffer_id: buffer_id.to_proto(),
3139                                        staged_text,
3140                                        committed_text,
3141                                        mode: mode as i32,
3142                                    })
3143                                    .log_err();
3144                            }
3145
3146                            diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
3147                        });
3148                    }
3149                })
3150            },
3151        );
3152    }
3153
3154    pub fn send_job<F, Fut, R>(
3155        &mut self,
3156        status: Option<SharedString>,
3157        job: F,
3158    ) -> oneshot::Receiver<R>
3159    where
3160        F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3161        Fut: Future<Output = R> + 'static,
3162        R: Send + 'static,
3163    {
3164        self.send_keyed_job(None, status, job)
3165    }
3166
3167    fn send_keyed_job<F, Fut, R>(
3168        &mut self,
3169        key: Option<GitJobKey>,
3170        status: Option<SharedString>,
3171        job: F,
3172    ) -> oneshot::Receiver<R>
3173    where
3174        F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3175        Fut: Future<Output = R> + 'static,
3176        R: Send + 'static,
3177    {
3178        let (result_tx, result_rx) = futures::channel::oneshot::channel();
3179        let job_id = post_inc(&mut self.job_id);
3180        let this = self.this.clone();
3181        self.job_sender
3182            .unbounded_send(GitJob {
3183                key,
3184                job: Box::new(move |state, cx: &mut AsyncApp| {
3185                    let job = job(state, cx.clone());
3186                    cx.spawn(async move |cx| {
3187                        if let Some(s) = status.clone() {
3188                            this.update(cx, |this, cx| {
3189                                this.active_jobs.insert(
3190                                    job_id,
3191                                    JobInfo {
3192                                        start: Instant::now(),
3193                                        message: s.clone(),
3194                                    },
3195                                );
3196
3197                                cx.notify();
3198                            })
3199                            .ok();
3200                        }
3201                        let result = job.await;
3202
3203                        this.update(cx, |this, cx| {
3204                            this.active_jobs.remove(&job_id);
3205                            cx.notify();
3206                        })
3207                        .ok();
3208
3209                        result_tx.send(result).ok();
3210                    })
3211                }),
3212            })
3213            .ok();
3214        result_rx
3215    }
3216
3217    pub fn set_as_active_repository(&self, cx: &mut Context<Self>) {
3218        let Some(git_store) = self.git_store.upgrade() else {
3219            return;
3220        };
3221        let entity = cx.entity();
3222        git_store.update(cx, |git_store, cx| {
3223            let Some((&id, _)) = git_store
3224                .repositories
3225                .iter()
3226                .find(|(_, handle)| *handle == &entity)
3227            else {
3228                return;
3229            };
3230            git_store.active_repo_id = Some(id);
3231            cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
3232        });
3233    }
3234
3235    pub fn cached_status(&self) -> impl '_ + Iterator<Item = StatusEntry> {
3236        self.snapshot.status()
3237    }
3238
3239    pub fn repo_path_to_project_path(&self, path: &RepoPath, cx: &App) -> Option<ProjectPath> {
3240        let git_store = self.git_store.upgrade()?;
3241        let worktree_store = git_store.read(cx).worktree_store.read(cx);
3242        let abs_path = self.snapshot.work_directory_abs_path.join(&path.0);
3243        let abs_path = SanitizedPath::new(&abs_path);
3244        let (worktree, relative_path) = worktree_store.find_worktree(abs_path, cx)?;
3245        Some(ProjectPath {
3246            worktree_id: worktree.read(cx).id(),
3247            path: relative_path.into(),
3248        })
3249    }
3250
3251    pub fn project_path_to_repo_path(&self, path: &ProjectPath, cx: &App) -> Option<RepoPath> {
3252        let git_store = self.git_store.upgrade()?;
3253        let worktree_store = git_store.read(cx).worktree_store.read(cx);
3254        let abs_path = worktree_store.absolutize(path, cx)?;
3255        self.snapshot.abs_path_to_repo_path(&abs_path)
3256    }
3257
3258    pub fn contains_sub_repo(&self, other: &Entity<Self>, cx: &App) -> bool {
3259        other
3260            .read(cx)
3261            .snapshot
3262            .work_directory_abs_path
3263            .starts_with(&self.snapshot.work_directory_abs_path)
3264    }
3265
3266    pub fn open_commit_buffer(
3267        &mut self,
3268        languages: Option<Arc<LanguageRegistry>>,
3269        buffer_store: Entity<BufferStore>,
3270        cx: &mut Context<Self>,
3271    ) -> Task<Result<Entity<Buffer>>> {
3272        let id = self.id;
3273        if let Some(buffer) = self.commit_message_buffer.clone() {
3274            return Task::ready(Ok(buffer));
3275        }
3276        let this = cx.weak_entity();
3277
3278        let rx = self.send_job(None, move |state, mut cx| async move {
3279            let Some(this) = this.upgrade() else {
3280                bail!("git store was dropped");
3281            };
3282            match state {
3283                RepositoryState::Local { .. } => {
3284                    this.update(&mut cx, |_, cx| {
3285                        Self::open_local_commit_buffer(languages, buffer_store, cx)
3286                    })?
3287                    .await
3288                }
3289                RepositoryState::Remote { project_id, client } => {
3290                    let request = client.request(proto::OpenCommitMessageBuffer {
3291                        project_id: project_id.0,
3292                        repository_id: id.to_proto(),
3293                    });
3294                    let response = request.await.context("requesting to open commit buffer")?;
3295                    let buffer_id = BufferId::new(response.buffer_id)?;
3296                    let buffer = buffer_store
3297                        .update(&mut cx, |buffer_store, cx| {
3298                            buffer_store.wait_for_remote_buffer(buffer_id, cx)
3299                        })?
3300                        .await?;
3301                    if let Some(language_registry) = languages {
3302                        let git_commit_language =
3303                            language_registry.language_for_name("Git Commit").await?;
3304                        buffer.update(&mut cx, |buffer, cx| {
3305                            buffer.set_language(Some(git_commit_language), cx);
3306                        })?;
3307                    }
3308                    this.update(&mut cx, |this, _| {
3309                        this.commit_message_buffer = Some(buffer.clone());
3310                    })?;
3311                    Ok(buffer)
3312                }
3313            }
3314        });
3315
3316        cx.spawn(|_, _: &mut AsyncApp| async move { rx.await? })
3317    }
3318
3319    fn open_local_commit_buffer(
3320        language_registry: Option<Arc<LanguageRegistry>>,
3321        buffer_store: Entity<BufferStore>,
3322        cx: &mut Context<Self>,
3323    ) -> Task<Result<Entity<Buffer>>> {
3324        cx.spawn(async move |repository, cx| {
3325            let buffer = buffer_store
3326                .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx))?
3327                .await?;
3328
3329            if let Some(language_registry) = language_registry {
3330                let git_commit_language = language_registry.language_for_name("Git Commit").await?;
3331                buffer.update(cx, |buffer, cx| {
3332                    buffer.set_language(Some(git_commit_language), cx);
3333                })?;
3334            }
3335
3336            repository.update(cx, |repository, _| {
3337                repository.commit_message_buffer = Some(buffer.clone());
3338            })?;
3339            Ok(buffer)
3340        })
3341    }
3342
3343    pub fn checkout_files(
3344        &mut self,
3345        commit: &str,
3346        paths: Vec<RepoPath>,
3347        _cx: &mut App,
3348    ) -> oneshot::Receiver<Result<()>> {
3349        let commit = commit.to_string();
3350        let id = self.id;
3351
3352        self.send_job(
3353            Some(format!("git checkout {}", commit).into()),
3354            move |git_repo, _| async move {
3355                match git_repo {
3356                    RepositoryState::Local {
3357                        backend,
3358                        environment,
3359                        ..
3360                    } => {
3361                        backend
3362                            .checkout_files(commit, paths, environment.clone())
3363                            .await
3364                    }
3365                    RepositoryState::Remote { project_id, client } => {
3366                        client
3367                            .request(proto::GitCheckoutFiles {
3368                                project_id: project_id.0,
3369                                repository_id: id.to_proto(),
3370                                commit,
3371                                paths: paths
3372                                    .into_iter()
3373                                    .map(|p| p.to_string_lossy().to_string())
3374                                    .collect(),
3375                            })
3376                            .await?;
3377
3378                        Ok(())
3379                    }
3380                }
3381            },
3382        )
3383    }
3384
3385    pub fn reset(
3386        &mut self,
3387        commit: String,
3388        reset_mode: ResetMode,
3389        _cx: &mut App,
3390    ) -> oneshot::Receiver<Result<()>> {
3391        let id = self.id;
3392
3393        self.send_job(None, move |git_repo, _| async move {
3394            match git_repo {
3395                RepositoryState::Local {
3396                    backend,
3397                    environment,
3398                    ..
3399                } => backend.reset(commit, reset_mode, environment).await,
3400                RepositoryState::Remote { project_id, client } => {
3401                    client
3402                        .request(proto::GitReset {
3403                            project_id: project_id.0,
3404                            repository_id: id.to_proto(),
3405                            commit,
3406                            mode: match reset_mode {
3407                                ResetMode::Soft => git_reset::ResetMode::Soft.into(),
3408                                ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
3409                            },
3410                        })
3411                        .await?;
3412
3413                    Ok(())
3414                }
3415            }
3416        })
3417    }
3418
3419    pub fn show(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDetails>> {
3420        let id = self.id;
3421        self.send_job(None, move |git_repo, _cx| async move {
3422            match git_repo {
3423                RepositoryState::Local { backend, .. } => backend.show(commit).await,
3424                RepositoryState::Remote { project_id, client } => {
3425                    let resp = client
3426                        .request(proto::GitShow {
3427                            project_id: project_id.0,
3428                            repository_id: id.to_proto(),
3429                            commit,
3430                        })
3431                        .await?;
3432
3433                    Ok(CommitDetails {
3434                        sha: resp.sha.into(),
3435                        message: resp.message.into(),
3436                        commit_timestamp: resp.commit_timestamp,
3437                        author_email: resp.author_email.into(),
3438                        author_name: resp.author_name.into(),
3439                    })
3440                }
3441            }
3442        })
3443    }
3444
3445    pub fn load_commit_diff(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
3446        let id = self.id;
3447        self.send_job(None, move |git_repo, cx| async move {
3448            match git_repo {
3449                RepositoryState::Local { backend, .. } => backend.load_commit(commit, cx).await,
3450                RepositoryState::Remote {
3451                    client, project_id, ..
3452                } => {
3453                    let response = client
3454                        .request(proto::LoadCommitDiff {
3455                            project_id: project_id.0,
3456                            repository_id: id.to_proto(),
3457                            commit,
3458                        })
3459                        .await?;
3460                    Ok(CommitDiff {
3461                        files: response
3462                            .files
3463                            .into_iter()
3464                            .map(|file| CommitFile {
3465                                path: Path::new(&file.path).into(),
3466                                old_text: file.old_text,
3467                                new_text: file.new_text,
3468                            })
3469                            .collect(),
3470                    })
3471                }
3472            }
3473        })
3474    }
3475
3476    pub fn merge_base(
3477        &mut self,
3478        commit_a: String,
3479        commit_b: String,
3480    ) -> oneshot::Receiver<Option<String>> {
3481        let id = self.id;
3482        self.send_job(None, move |git_repo, cx| async move {
3483            match git_repo {
3484                RepositoryState::Local { backend, .. } => {
3485                    backend.merge_base(commit_a, commit_b).await
3486                }
3487                RepositoryState::Remote {
3488                    client, project_id, ..
3489                } => {
3490                    todo!();
3491                }
3492            }
3493        })
3494    }
3495
3496    pub fn diff_to_commit(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
3497        let id = self.id;
3498        self.send_job(None, move |git_repo, cx| async move {
3499            match git_repo {
3500                RepositoryState::Local { backend, .. } => backend.diff_to_commit(commit, cx).await,
3501                RepositoryState::Remote {
3502                    client, project_id, ..
3503                } => {
3504                    todo!();
3505                    let response = client
3506                        .request(proto::LoadCommitDiff {
3507                            project_id: project_id.0,
3508                            repository_id: id.to_proto(),
3509                            commit,
3510                        })
3511                        .await?;
3512                    Ok(CommitDiff {
3513                        files: response
3514                            .files
3515                            .into_iter()
3516                            .map(|file| CommitFile {
3517                                path: Path::new(&file.path).into(),
3518                                old_text: file.old_text,
3519                                new_text: file.new_text,
3520                            })
3521                            .collect(),
3522                    })
3523                }
3524            }
3525        })
3526    }
3527
3528    fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
3529        Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
3530    }
3531
3532    pub fn stage_entries(
3533        &self,
3534        entries: Vec<RepoPath>,
3535        cx: &mut Context<Self>,
3536    ) -> Task<anyhow::Result<()>> {
3537        if entries.is_empty() {
3538            return Task::ready(Ok(()));
3539        }
3540        let id = self.id;
3541
3542        let mut save_futures = Vec::new();
3543        if let Some(buffer_store) = self.buffer_store(cx) {
3544            buffer_store.update(cx, |buffer_store, cx| {
3545                for path in &entries {
3546                    let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3547                        continue;
3548                    };
3549                    if let Some(buffer) = buffer_store.get_by_path(&project_path)
3550                        && buffer
3551                            .read(cx)
3552                            .file()
3553                            .is_some_and(|file| file.disk_state().exists())
3554                    {
3555                        save_futures.push(buffer_store.save_buffer(buffer, cx));
3556                    }
3557                }
3558            })
3559        }
3560
3561        cx.spawn(async move |this, cx| {
3562            for save_future in save_futures {
3563                save_future.await?;
3564            }
3565
3566            this.update(cx, |this, _| {
3567                this.send_job(None, move |git_repo, _cx| async move {
3568                    match git_repo {
3569                        RepositoryState::Local {
3570                            backend,
3571                            environment,
3572                            ..
3573                        } => backend.stage_paths(entries, environment.clone()).await,
3574                        RepositoryState::Remote { project_id, client } => {
3575                            client
3576                                .request(proto::Stage {
3577                                    project_id: project_id.0,
3578                                    repository_id: id.to_proto(),
3579                                    paths: entries
3580                                        .into_iter()
3581                                        .map(|repo_path| repo_path.as_ref().to_proto())
3582                                        .collect(),
3583                                })
3584                                .await
3585                                .context("sending stage request")?;
3586
3587                            Ok(())
3588                        }
3589                    }
3590                })
3591            })?
3592            .await??;
3593
3594            Ok(())
3595        })
3596    }
3597
3598    pub fn unstage_entries(
3599        &self,
3600        entries: Vec<RepoPath>,
3601        cx: &mut Context<Self>,
3602    ) -> Task<anyhow::Result<()>> {
3603        if entries.is_empty() {
3604            return Task::ready(Ok(()));
3605        }
3606        let id = self.id;
3607
3608        let mut save_futures = Vec::new();
3609        if let Some(buffer_store) = self.buffer_store(cx) {
3610            buffer_store.update(cx, |buffer_store, cx| {
3611                for path in &entries {
3612                    let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3613                        continue;
3614                    };
3615                    if let Some(buffer) = buffer_store.get_by_path(&project_path)
3616                        && buffer
3617                            .read(cx)
3618                            .file()
3619                            .is_some_and(|file| file.disk_state().exists())
3620                    {
3621                        save_futures.push(buffer_store.save_buffer(buffer, cx));
3622                    }
3623                }
3624            })
3625        }
3626
3627        cx.spawn(async move |this, cx| {
3628            for save_future in save_futures {
3629                save_future.await?;
3630            }
3631
3632            this.update(cx, |this, _| {
3633                this.send_job(None, move |git_repo, _cx| async move {
3634                    match git_repo {
3635                        RepositoryState::Local {
3636                            backend,
3637                            environment,
3638                            ..
3639                        } => backend.unstage_paths(entries, environment).await,
3640                        RepositoryState::Remote { project_id, client } => {
3641                            client
3642                                .request(proto::Unstage {
3643                                    project_id: project_id.0,
3644                                    repository_id: id.to_proto(),
3645                                    paths: entries
3646                                        .into_iter()
3647                                        .map(|repo_path| repo_path.as_ref().to_proto())
3648                                        .collect(),
3649                                })
3650                                .await
3651                                .context("sending unstage request")?;
3652
3653                            Ok(())
3654                        }
3655                    }
3656                })
3657            })?
3658            .await??;
3659
3660            Ok(())
3661        })
3662    }
3663
3664    pub fn stage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3665        let to_stage = self
3666            .cached_status()
3667            .filter(|entry| !entry.status.staging().is_fully_staged())
3668            .map(|entry| entry.repo_path)
3669            .collect();
3670        self.stage_entries(to_stage, cx)
3671    }
3672
3673    pub fn unstage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3674        let to_unstage = self
3675            .cached_status()
3676            .filter(|entry| entry.status.staging().has_staged())
3677            .map(|entry| entry.repo_path)
3678            .collect();
3679        self.unstage_entries(to_unstage, cx)
3680    }
3681
3682    pub fn stash_all(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3683        let to_stash = self.cached_status().map(|entry| entry.repo_path).collect();
3684
3685        self.stash_entries(to_stash, cx)
3686    }
3687
3688    pub fn stash_entries(
3689        &mut self,
3690        entries: Vec<RepoPath>,
3691        cx: &mut Context<Self>,
3692    ) -> Task<anyhow::Result<()>> {
3693        let id = self.id;
3694
3695        cx.spawn(async move |this, cx| {
3696            this.update(cx, |this, _| {
3697                this.send_job(None, move |git_repo, _cx| async move {
3698                    match git_repo {
3699                        RepositoryState::Local {
3700                            backend,
3701                            environment,
3702                            ..
3703                        } => backend.stash_paths(entries, environment).await,
3704                        RepositoryState::Remote { project_id, client } => {
3705                            client
3706                                .request(proto::Stash {
3707                                    project_id: project_id.0,
3708                                    repository_id: id.to_proto(),
3709                                    paths: entries
3710                                        .into_iter()
3711                                        .map(|repo_path| repo_path.as_ref().to_proto())
3712                                        .collect(),
3713                                })
3714                                .await
3715                                .context("sending stash request")?;
3716                            Ok(())
3717                        }
3718                    }
3719                })
3720            })?
3721            .await??;
3722            Ok(())
3723        })
3724    }
3725
3726    pub fn stash_pop(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3727        let id = self.id;
3728        cx.spawn(async move |this, cx| {
3729            this.update(cx, |this, _| {
3730                this.send_job(None, move |git_repo, _cx| async move {
3731                    match git_repo {
3732                        RepositoryState::Local {
3733                            backend,
3734                            environment,
3735                            ..
3736                        } => backend.stash_pop(environment).await,
3737                        RepositoryState::Remote { project_id, client } => {
3738                            client
3739                                .request(proto::StashPop {
3740                                    project_id: project_id.0,
3741                                    repository_id: id.to_proto(),
3742                                })
3743                                .await
3744                                .context("sending stash pop request")?;
3745                            Ok(())
3746                        }
3747                    }
3748                })
3749            })?
3750            .await??;
3751            Ok(())
3752        })
3753    }
3754
3755    pub fn commit(
3756        &mut self,
3757        message: SharedString,
3758        name_and_email: Option<(SharedString, SharedString)>,
3759        options: CommitOptions,
3760        _cx: &mut App,
3761    ) -> oneshot::Receiver<Result<()>> {
3762        let id = self.id;
3763
3764        self.send_job(Some("git commit".into()), move |git_repo, _cx| async move {
3765            match git_repo {
3766                RepositoryState::Local {
3767                    backend,
3768                    environment,
3769                    ..
3770                } => {
3771                    backend
3772                        .commit(message, name_and_email, options, environment)
3773                        .await
3774                }
3775                RepositoryState::Remote { project_id, client } => {
3776                    let (name, email) = name_and_email.unzip();
3777                    client
3778                        .request(proto::Commit {
3779                            project_id: project_id.0,
3780                            repository_id: id.to_proto(),
3781                            message: String::from(message),
3782                            name: name.map(String::from),
3783                            email: email.map(String::from),
3784                            options: Some(proto::commit::CommitOptions {
3785                                amend: options.amend,
3786                                signoff: options.signoff,
3787                            }),
3788                        })
3789                        .await
3790                        .context("sending commit request")?;
3791
3792                    Ok(())
3793                }
3794            }
3795        })
3796    }
3797
3798    pub fn fetch(
3799        &mut self,
3800        fetch_options: FetchOptions,
3801        askpass: AskPassDelegate,
3802        _cx: &mut App,
3803    ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3804        let askpass_delegates = self.askpass_delegates.clone();
3805        let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3806        let id = self.id;
3807
3808        self.send_job(Some("git fetch".into()), move |git_repo, cx| async move {
3809            match git_repo {
3810                RepositoryState::Local {
3811                    backend,
3812                    environment,
3813                    ..
3814                } => backend.fetch(fetch_options, askpass, environment, cx).await,
3815                RepositoryState::Remote { project_id, client } => {
3816                    askpass_delegates.lock().insert(askpass_id, askpass);
3817                    let _defer = util::defer(|| {
3818                        let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3819                        debug_assert!(askpass_delegate.is_some());
3820                    });
3821
3822                    let response = client
3823                        .request(proto::Fetch {
3824                            project_id: project_id.0,
3825                            repository_id: id.to_proto(),
3826                            askpass_id,
3827                            remote: fetch_options.to_proto(),
3828                        })
3829                        .await
3830                        .context("sending fetch request")?;
3831
3832                    Ok(RemoteCommandOutput {
3833                        stdout: response.stdout,
3834                        stderr: response.stderr,
3835                    })
3836                }
3837            }
3838        })
3839    }
3840
3841    pub fn push(
3842        &mut self,
3843        branch: SharedString,
3844        remote: SharedString,
3845        options: Option<PushOptions>,
3846        askpass: AskPassDelegate,
3847        cx: &mut Context<Self>,
3848    ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3849        let askpass_delegates = self.askpass_delegates.clone();
3850        let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3851        let id = self.id;
3852
3853        let args = options
3854            .map(|option| match option {
3855                PushOptions::SetUpstream => " --set-upstream",
3856                PushOptions::Force => " --force-with-lease",
3857            })
3858            .unwrap_or("");
3859
3860        let updates_tx = self
3861            .git_store()
3862            .and_then(|git_store| match &git_store.read(cx).state {
3863                GitStoreState::Local { downstream, .. } => downstream
3864                    .as_ref()
3865                    .map(|downstream| downstream.updates_tx.clone()),
3866                _ => None,
3867            });
3868
3869        let this = cx.weak_entity();
3870        self.send_job(
3871            Some(format!("git push {} {} {}", args, branch, remote).into()),
3872            move |git_repo, mut cx| async move {
3873                match git_repo {
3874                    RepositoryState::Local {
3875                        backend,
3876                        environment,
3877                        ..
3878                    } => {
3879                        let result = backend
3880                            .push(
3881                                branch.to_string(),
3882                                remote.to_string(),
3883                                options,
3884                                askpass,
3885                                environment.clone(),
3886                                cx.clone(),
3887                            )
3888                            .await;
3889                        if result.is_ok() {
3890                            let branches = backend.branches().await?;
3891                            let branch = branches.into_iter().find(|branch| branch.is_head);
3892                            log::info!("head branch after scan is {branch:?}");
3893                            let snapshot = this.update(&mut cx, |this, cx| {
3894                                this.snapshot.branch = branch;
3895                                let snapshot = this.snapshot.clone();
3896                                cx.emit(RepositoryEvent::Updated {
3897                                    full_scan: false,
3898                                    new_instance: false,
3899                                });
3900                                snapshot
3901                            })?;
3902                            if let Some(updates_tx) = updates_tx {
3903                                updates_tx
3904                                    .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
3905                                    .ok();
3906                            }
3907                        }
3908                        result
3909                    }
3910                    RepositoryState::Remote { project_id, client } => {
3911                        askpass_delegates.lock().insert(askpass_id, askpass);
3912                        let _defer = util::defer(|| {
3913                            let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3914                            debug_assert!(askpass_delegate.is_some());
3915                        });
3916                        let response = client
3917                            .request(proto::Push {
3918                                project_id: project_id.0,
3919                                repository_id: id.to_proto(),
3920                                askpass_id,
3921                                branch_name: branch.to_string(),
3922                                remote_name: remote.to_string(),
3923                                options: options.map(|options| match options {
3924                                    PushOptions::Force => proto::push::PushOptions::Force,
3925                                    PushOptions::SetUpstream => {
3926                                        proto::push::PushOptions::SetUpstream
3927                                    }
3928                                }
3929                                    as i32),
3930                            })
3931                            .await
3932                            .context("sending push request")?;
3933
3934                        Ok(RemoteCommandOutput {
3935                            stdout: response.stdout,
3936                            stderr: response.stderr,
3937                        })
3938                    }
3939                }
3940            },
3941        )
3942    }
3943
3944    pub fn pull(
3945        &mut self,
3946        branch: SharedString,
3947        remote: SharedString,
3948        askpass: AskPassDelegate,
3949        _cx: &mut App,
3950    ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3951        let askpass_delegates = self.askpass_delegates.clone();
3952        let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3953        let id = self.id;
3954
3955        self.send_job(
3956            Some(format!("git pull {} {}", remote, branch).into()),
3957            move |git_repo, cx| async move {
3958                match git_repo {
3959                    RepositoryState::Local {
3960                        backend,
3961                        environment,
3962                        ..
3963                    } => {
3964                        backend
3965                            .pull(
3966                                branch.to_string(),
3967                                remote.to_string(),
3968                                askpass,
3969                                environment.clone(),
3970                                cx,
3971                            )
3972                            .await
3973                    }
3974                    RepositoryState::Remote { project_id, client } => {
3975                        askpass_delegates.lock().insert(askpass_id, askpass);
3976                        let _defer = util::defer(|| {
3977                            let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3978                            debug_assert!(askpass_delegate.is_some());
3979                        });
3980                        let response = client
3981                            .request(proto::Pull {
3982                                project_id: project_id.0,
3983                                repository_id: id.to_proto(),
3984                                askpass_id,
3985                                branch_name: branch.to_string(),
3986                                remote_name: remote.to_string(),
3987                            })
3988                            .await
3989                            .context("sending pull request")?;
3990
3991                        Ok(RemoteCommandOutput {
3992                            stdout: response.stdout,
3993                            stderr: response.stderr,
3994                        })
3995                    }
3996                }
3997            },
3998        )
3999    }
4000
4001    fn spawn_set_index_text_job(
4002        &mut self,
4003        path: RepoPath,
4004        content: Option<String>,
4005        hunk_staging_operation_count: Option<usize>,
4006        cx: &mut Context<Self>,
4007    ) -> oneshot::Receiver<anyhow::Result<()>> {
4008        let id = self.id;
4009        let this = cx.weak_entity();
4010        let git_store = self.git_store.clone();
4011        self.send_keyed_job(
4012            Some(GitJobKey::WriteIndex(path.clone())),
4013            None,
4014            move |git_repo, mut cx| async move {
4015                log::debug!("start updating index text for buffer {}", path.display());
4016                match git_repo {
4017                    RepositoryState::Local {
4018                        backend,
4019                        environment,
4020                        ..
4021                    } => {
4022                        backend
4023                            .set_index_text(path.clone(), content, environment.clone())
4024                            .await?;
4025                    }
4026                    RepositoryState::Remote { project_id, client } => {
4027                        client
4028                            .request(proto::SetIndexText {
4029                                project_id: project_id.0,
4030                                repository_id: id.to_proto(),
4031                                path: path.as_ref().to_proto(),
4032                                text: content,
4033                            })
4034                            .await?;
4035                    }
4036                }
4037                log::debug!("finish updating index text for buffer {}", path.display());
4038
4039                if let Some(hunk_staging_operation_count) = hunk_staging_operation_count {
4040                    let project_path = this
4041                        .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
4042                        .ok()
4043                        .flatten();
4044                    git_store.update(&mut cx, |git_store, cx| {
4045                        let buffer_id = git_store
4046                            .buffer_store
4047                            .read(cx)
4048                            .get_by_path(&project_path?)?
4049                            .read(cx)
4050                            .remote_id();
4051                        let diff_state = git_store.diffs.get(&buffer_id)?;
4052                        diff_state.update(cx, |diff_state, _| {
4053                            diff_state.hunk_staging_operation_count_as_of_write =
4054                                hunk_staging_operation_count;
4055                        });
4056                        Some(())
4057                    })?;
4058                }
4059                Ok(())
4060            },
4061        )
4062    }
4063
4064    pub fn get_remotes(
4065        &mut self,
4066        branch_name: Option<String>,
4067    ) -> oneshot::Receiver<Result<Vec<Remote>>> {
4068        let id = self.id;
4069        self.send_job(None, move |repo, _cx| async move {
4070            match repo {
4071                RepositoryState::Local { backend, .. } => backend.get_remotes(branch_name).await,
4072                RepositoryState::Remote { project_id, client } => {
4073                    let response = client
4074                        .request(proto::GetRemotes {
4075                            project_id: project_id.0,
4076                            repository_id: id.to_proto(),
4077                            branch_name,
4078                        })
4079                        .await?;
4080
4081                    let remotes = response
4082                        .remotes
4083                        .into_iter()
4084                        .map(|remotes| git::repository::Remote {
4085                            name: remotes.name.into(),
4086                        })
4087                        .collect();
4088
4089                    Ok(remotes)
4090                }
4091            }
4092        })
4093    }
4094
4095    pub fn branches(&mut self) -> oneshot::Receiver<Result<Vec<Branch>>> {
4096        let id = self.id;
4097        self.send_job(None, move |repo, _| async move {
4098            match repo {
4099                RepositoryState::Local { backend, .. } => backend.branches().await,
4100                RepositoryState::Remote { project_id, client } => {
4101                    let response = client
4102                        .request(proto::GitGetBranches {
4103                            project_id: project_id.0,
4104                            repository_id: id.to_proto(),
4105                        })
4106                        .await?;
4107
4108                    let branches = response
4109                        .branches
4110                        .into_iter()
4111                        .map(|branch| proto_to_branch(&branch))
4112                        .collect();
4113
4114                    Ok(branches)
4115                }
4116            }
4117        })
4118    }
4119
4120    pub fn default_branch(&mut self) -> oneshot::Receiver<Result<Option<SharedString>>> {
4121        let id = self.id;
4122        self.send_job(None, move |repo, _| async move {
4123            match repo {
4124                RepositoryState::Local { backend, .. } => backend.default_branch().await,
4125                RepositoryState::Remote { project_id, client } => {
4126                    let response = client
4127                        .request(proto::GetDefaultBranch {
4128                            project_id: project_id.0,
4129                            repository_id: id.to_proto(),
4130                        })
4131                        .await?;
4132
4133                    anyhow::Ok(response.branch.map(SharedString::from))
4134                }
4135            }
4136        })
4137    }
4138
4139    pub fn diff(&mut self, diff_type: DiffType, _cx: &App) -> oneshot::Receiver<Result<String>> {
4140        let id = self.id;
4141        self.send_job(None, move |repo, _cx| async move {
4142            match repo {
4143                RepositoryState::Local { backend, .. } => backend.diff(diff_type).await,
4144                RepositoryState::Remote { project_id, client } => {
4145                    let response = client
4146                        .request(proto::GitDiff {
4147                            project_id: project_id.0,
4148                            repository_id: id.to_proto(),
4149                            diff_type: match diff_type {
4150                                DiffType::HeadToIndex => {
4151                                    proto::git_diff::DiffType::HeadToIndex.into()
4152                                }
4153                                DiffType::HeadToWorktree => {
4154                                    proto::git_diff::DiffType::HeadToWorktree.into()
4155                                }
4156                            },
4157                        })
4158                        .await?;
4159
4160                    Ok(response.diff)
4161                }
4162            }
4163        })
4164    }
4165
4166    pub fn create_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
4167        let id = self.id;
4168        self.send_job(
4169            Some(format!("git switch -c {branch_name}").into()),
4170            move |repo, _cx| async move {
4171                match repo {
4172                    RepositoryState::Local { backend, .. } => {
4173                        backend.create_branch(branch_name).await
4174                    }
4175                    RepositoryState::Remote { project_id, client } => {
4176                        client
4177                            .request(proto::GitCreateBranch {
4178                                project_id: project_id.0,
4179                                repository_id: id.to_proto(),
4180                                branch_name,
4181                            })
4182                            .await?;
4183
4184                        Ok(())
4185                    }
4186                }
4187            },
4188        )
4189    }
4190
4191    pub fn change_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
4192        let id = self.id;
4193        self.send_job(
4194            Some(format!("git switch {branch_name}").into()),
4195            move |repo, _cx| async move {
4196                match repo {
4197                    RepositoryState::Local { backend, .. } => {
4198                        backend.change_branch(branch_name).await
4199                    }
4200                    RepositoryState::Remote { project_id, client } => {
4201                        client
4202                            .request(proto::GitChangeBranch {
4203                                project_id: project_id.0,
4204                                repository_id: id.to_proto(),
4205                                branch_name,
4206                            })
4207                            .await?;
4208
4209                        Ok(())
4210                    }
4211                }
4212            },
4213        )
4214    }
4215
4216    pub fn check_for_pushed_commits(&mut self) -> oneshot::Receiver<Result<Vec<SharedString>>> {
4217        let id = self.id;
4218        self.send_job(None, move |repo, _cx| async move {
4219            match repo {
4220                RepositoryState::Local { backend, .. } => backend.check_for_pushed_commit().await,
4221                RepositoryState::Remote { project_id, client } => {
4222                    let response = client
4223                        .request(proto::CheckForPushedCommits {
4224                            project_id: project_id.0,
4225                            repository_id: id.to_proto(),
4226                        })
4227                        .await?;
4228
4229                    let branches = response.pushed_to.into_iter().map(Into::into).collect();
4230
4231                    Ok(branches)
4232                }
4233            }
4234        })
4235    }
4236
4237    pub fn checkpoint(&mut self) -> oneshot::Receiver<Result<GitRepositoryCheckpoint>> {
4238        self.send_job(None, |repo, _cx| async move {
4239            match repo {
4240                RepositoryState::Local { backend, .. } => backend.checkpoint().await,
4241                RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
4242            }
4243        })
4244    }
4245
4246    pub fn restore_checkpoint(
4247        &mut self,
4248        checkpoint: GitRepositoryCheckpoint,
4249    ) -> oneshot::Receiver<Result<()>> {
4250        self.send_job(None, move |repo, _cx| async move {
4251            match repo {
4252                RepositoryState::Local { backend, .. } => {
4253                    backend.restore_checkpoint(checkpoint).await
4254                }
4255                RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
4256            }
4257        })
4258    }
4259
4260    pub(crate) fn apply_remote_update(
4261        &mut self,
4262        update: proto::UpdateRepository,
4263        is_new: bool,
4264        cx: &mut Context<Self>,
4265    ) -> Result<()> {
4266        let conflicted_paths = TreeSet::from_ordered_entries(
4267            update
4268                .current_merge_conflicts
4269                .into_iter()
4270                .map(|path| RepoPath(Path::new(&path).into())),
4271        );
4272        self.snapshot.branch = update.branch_summary.as_ref().map(proto_to_branch);
4273        self.snapshot.head_commit = update
4274            .head_commit_details
4275            .as_ref()
4276            .map(proto_to_commit_details);
4277
4278        self.snapshot.merge.conflicted_paths = conflicted_paths;
4279        self.snapshot.merge.message = update.merge_message.map(SharedString::from);
4280
4281        let edits = update
4282            .removed_statuses
4283            .into_iter()
4284            .map(|path| sum_tree::Edit::Remove(PathKey(FromProto::from_proto(path))))
4285            .chain(
4286                update
4287                    .updated_statuses
4288                    .into_iter()
4289                    .filter_map(|updated_status| {
4290                        Some(sum_tree::Edit::Insert(updated_status.try_into().log_err()?))
4291                    }),
4292            )
4293            .collect::<Vec<_>>();
4294        self.snapshot.statuses_by_path.edit(edits, &());
4295        if update.is_last_update {
4296            self.snapshot.scan_id = update.scan_id;
4297        }
4298        cx.emit(RepositoryEvent::Updated {
4299            full_scan: true,
4300            new_instance: is_new,
4301        });
4302        Ok(())
4303    }
4304
4305    pub fn compare_checkpoints(
4306        &mut self,
4307        left: GitRepositoryCheckpoint,
4308        right: GitRepositoryCheckpoint,
4309    ) -> oneshot::Receiver<Result<bool>> {
4310        self.send_job(None, move |repo, _cx| async move {
4311            match repo {
4312                RepositoryState::Local { backend, .. } => {
4313                    backend.compare_checkpoints(left, right).await
4314                }
4315                RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
4316            }
4317        })
4318    }
4319
4320    pub fn diff_checkpoints(
4321        &mut self,
4322        base_checkpoint: GitRepositoryCheckpoint,
4323        target_checkpoint: GitRepositoryCheckpoint,
4324    ) -> oneshot::Receiver<Result<String>> {
4325        self.send_job(None, move |repo, _cx| async move {
4326            match repo {
4327                RepositoryState::Local { backend, .. } => {
4328                    backend
4329                        .diff_checkpoints(base_checkpoint, target_checkpoint)
4330                        .await
4331                }
4332                RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
4333            }
4334        })
4335    }
4336
4337    fn schedule_scan(
4338        &mut self,
4339        updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4340        cx: &mut Context<Self>,
4341    ) {
4342        let this = cx.weak_entity();
4343        let _ = self.send_keyed_job(
4344            Some(GitJobKey::ReloadGitState),
4345            None,
4346            |state, mut cx| async move {
4347                log::debug!("run scheduled git status scan");
4348
4349                let Some(this) = this.upgrade() else {
4350                    return Ok(());
4351                };
4352                let RepositoryState::Local { backend, .. } = state else {
4353                    bail!("not a local repository")
4354                };
4355                let (snapshot, events) = this
4356                    .update(&mut cx, |this, _| {
4357                        this.paths_needing_status_update.clear();
4358                        compute_snapshot(
4359                            this.id,
4360                            this.work_directory_abs_path.clone(),
4361                            this.snapshot.clone(),
4362                            backend.clone(),
4363                        )
4364                    })?
4365                    .await?;
4366                this.update(&mut cx, |this, cx| {
4367                    this.snapshot = snapshot.clone();
4368                    for event in events {
4369                        cx.emit(event);
4370                    }
4371                })?;
4372                if let Some(updates_tx) = updates_tx {
4373                    updates_tx
4374                        .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
4375                        .ok();
4376                }
4377                Ok(())
4378            },
4379        );
4380    }
4381
4382    fn spawn_local_git_worker(
4383        work_directory_abs_path: Arc<Path>,
4384        dot_git_abs_path: Arc<Path>,
4385        _repository_dir_abs_path: Arc<Path>,
4386        _common_dir_abs_path: Arc<Path>,
4387        project_environment: WeakEntity<ProjectEnvironment>,
4388        fs: Arc<dyn Fs>,
4389        cx: &mut Context<Self>,
4390    ) -> mpsc::UnboundedSender<GitJob> {
4391        let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4392
4393        cx.spawn(async move |_, cx| {
4394            let environment = project_environment
4395                .upgrade()
4396                .context("missing project environment")?
4397                .update(cx, |project_environment, cx| {
4398                    project_environment.get_directory_environment(work_directory_abs_path.clone(), cx)
4399                })?
4400                .await
4401                .unwrap_or_else(|| {
4402                    log::error!("failed to get working directory environment for repository {work_directory_abs_path:?}");
4403                    HashMap::default()
4404                });
4405            let backend = cx
4406                .background_spawn(async move {
4407                    fs.open_repo(&dot_git_abs_path)
4408                        .with_context(|| format!("opening repository at {dot_git_abs_path:?}"))
4409                })
4410                .await?;
4411
4412            if let Some(git_hosting_provider_registry) =
4413                cx.update(|cx| GitHostingProviderRegistry::try_global(cx))?
4414            {
4415                git_hosting_providers::register_additional_providers(
4416                    git_hosting_provider_registry,
4417                    backend.clone(),
4418                );
4419            }
4420
4421            let state = RepositoryState::Local {
4422                backend,
4423                environment: Arc::new(environment),
4424            };
4425            let mut jobs = VecDeque::new();
4426            loop {
4427                while let Ok(Some(next_job)) = job_rx.try_next() {
4428                    jobs.push_back(next_job);
4429                }
4430
4431                if let Some(job) = jobs.pop_front() {
4432                    if let Some(current_key) = &job.key
4433                        && jobs
4434                            .iter()
4435                            .any(|other_job| other_job.key.as_ref() == Some(current_key))
4436                        {
4437                            continue;
4438                        }
4439                    (job.job)(state.clone(), cx).await;
4440                } else if let Some(job) = job_rx.next().await {
4441                    jobs.push_back(job);
4442                } else {
4443                    break;
4444                }
4445            }
4446            anyhow::Ok(())
4447        })
4448        .detach_and_log_err(cx);
4449
4450        job_tx
4451    }
4452
4453    fn spawn_remote_git_worker(
4454        project_id: ProjectId,
4455        client: AnyProtoClient,
4456        cx: &mut Context<Self>,
4457    ) -> mpsc::UnboundedSender<GitJob> {
4458        let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4459
4460        cx.spawn(async move |_, cx| {
4461            let state = RepositoryState::Remote { project_id, client };
4462            let mut jobs = VecDeque::new();
4463            loop {
4464                while let Ok(Some(next_job)) = job_rx.try_next() {
4465                    jobs.push_back(next_job);
4466                }
4467
4468                if let Some(job) = jobs.pop_front() {
4469                    if let Some(current_key) = &job.key
4470                        && jobs
4471                            .iter()
4472                            .any(|other_job| other_job.key.as_ref() == Some(current_key))
4473                    {
4474                        continue;
4475                    }
4476                    (job.job)(state.clone(), cx).await;
4477                } else if let Some(job) = job_rx.next().await {
4478                    jobs.push_back(job);
4479                } else {
4480                    break;
4481                }
4482            }
4483            anyhow::Ok(())
4484        })
4485        .detach_and_log_err(cx);
4486
4487        job_tx
4488    }
4489
4490    fn load_staged_text(
4491        &mut self,
4492        buffer_id: BufferId,
4493        repo_path: RepoPath,
4494        cx: &App,
4495    ) -> Task<Result<Option<String>>> {
4496        let rx = self.send_job(None, move |state, _| async move {
4497            match state {
4498                RepositoryState::Local { backend, .. } => {
4499                    anyhow::Ok(backend.load_index_text(repo_path).await)
4500                }
4501                RepositoryState::Remote { project_id, client } => {
4502                    let response = client
4503                        .request(proto::OpenUnstagedDiff {
4504                            project_id: project_id.to_proto(),
4505                            buffer_id: buffer_id.to_proto(),
4506                        })
4507                        .await?;
4508                    Ok(response.staged_text)
4509                }
4510            }
4511        });
4512        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4513    }
4514
4515    fn load_committed_text(
4516        &mut self,
4517        buffer_id: BufferId,
4518        repo_path: RepoPath,
4519        cx: &App,
4520    ) -> Task<Result<DiffBasesChange>> {
4521        let rx = self.send_job(None, move |state, _| async move {
4522            match state {
4523                RepositoryState::Local { backend, .. } => {
4524                    let committed_text = backend.load_committed_text(repo_path.clone()).await;
4525                    let staged_text = backend.load_index_text(repo_path).await;
4526                    let diff_bases_change = if committed_text == staged_text {
4527                        DiffBasesChange::SetBoth(committed_text)
4528                    } else {
4529                        DiffBasesChange::SetEach {
4530                            index: staged_text,
4531                            head: committed_text,
4532                        }
4533                    };
4534                    anyhow::Ok(diff_bases_change)
4535                }
4536                RepositoryState::Remote { project_id, client } => {
4537                    use proto::open_uncommitted_diff_response::Mode;
4538
4539                    let response = client
4540                        .request(proto::OpenUncommittedDiff {
4541                            project_id: project_id.to_proto(),
4542                            buffer_id: buffer_id.to_proto(),
4543                        })
4544                        .await?;
4545                    let mode = Mode::from_i32(response.mode).context("Invalid mode")?;
4546                    let bases = match mode {
4547                        Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
4548                        Mode::IndexAndHead => DiffBasesChange::SetEach {
4549                            head: response.committed_text,
4550                            index: response.staged_text,
4551                        },
4552                    };
4553                    Ok(bases)
4554                }
4555            }
4556        });
4557
4558        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4559    }
4560
4561    fn paths_changed(
4562        &mut self,
4563        paths: Vec<RepoPath>,
4564        updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4565        cx: &mut Context<Self>,
4566    ) {
4567        self.paths_needing_status_update.extend(paths);
4568
4569        let this = cx.weak_entity();
4570        let _ = self.send_keyed_job(
4571            Some(GitJobKey::RefreshStatuses),
4572            None,
4573            |state, mut cx| async move {
4574                let (prev_snapshot, mut changed_paths) = this.update(&mut cx, |this, _| {
4575                    (
4576                        this.snapshot.clone(),
4577                        mem::take(&mut this.paths_needing_status_update),
4578                    )
4579                })?;
4580                let RepositoryState::Local { backend, .. } = state else {
4581                    bail!("not a local repository")
4582                };
4583
4584                let paths = changed_paths.iter().cloned().collect::<Vec<_>>();
4585                if paths.is_empty() {
4586                    return Ok(());
4587                }
4588                let statuses = backend.status(&paths).await?;
4589
4590                let changed_path_statuses = cx
4591                    .background_spawn(async move {
4592                        let mut changed_path_statuses = Vec::new();
4593                        let prev_statuses = prev_snapshot.statuses_by_path.clone();
4594                        let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4595
4596                        for (repo_path, status) in &*statuses.entries {
4597                            changed_paths.remove(repo_path);
4598                            if cursor.seek_forward(&PathTarget::Path(repo_path), Bias::Left)
4599                                && cursor.item().is_some_and(|entry| entry.status == *status)
4600                            {
4601                                continue;
4602                            }
4603
4604                            changed_path_statuses.push(Edit::Insert(StatusEntry {
4605                                repo_path: repo_path.clone(),
4606                                status: *status,
4607                            }));
4608                        }
4609                        let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4610                        for path in changed_paths.into_iter() {
4611                            if cursor.seek_forward(&PathTarget::Path(&path), Bias::Left) {
4612                                changed_path_statuses.push(Edit::Remove(PathKey(path.0)));
4613                            }
4614                        }
4615                        changed_path_statuses
4616                    })
4617                    .await;
4618
4619                this.update(&mut cx, |this, cx| {
4620                    if !changed_path_statuses.is_empty() {
4621                        this.snapshot
4622                            .statuses_by_path
4623                            .edit(changed_path_statuses, &());
4624                        this.snapshot.scan_id += 1;
4625                        if let Some(updates_tx) = updates_tx {
4626                            updates_tx
4627                                .unbounded_send(DownstreamUpdate::UpdateRepository(
4628                                    this.snapshot.clone(),
4629                                ))
4630                                .ok();
4631                        }
4632                    }
4633                    cx.emit(RepositoryEvent::Updated {
4634                        full_scan: false,
4635                        new_instance: false,
4636                    });
4637                })
4638            },
4639        );
4640    }
4641
4642    /// currently running git command and when it started
4643    pub fn current_job(&self) -> Option<JobInfo> {
4644        self.active_jobs.values().next().cloned()
4645    }
4646
4647    pub fn barrier(&mut self) -> oneshot::Receiver<()> {
4648        self.send_job(None, |_, _| async {})
4649    }
4650}
4651
4652fn get_permalink_in_rust_registry_src(
4653    provider_registry: Arc<GitHostingProviderRegistry>,
4654    path: PathBuf,
4655    selection: Range<u32>,
4656) -> Result<url::Url> {
4657    #[derive(Deserialize)]
4658    struct CargoVcsGit {
4659        sha1: String,
4660    }
4661
4662    #[derive(Deserialize)]
4663    struct CargoVcsInfo {
4664        git: CargoVcsGit,
4665        path_in_vcs: String,
4666    }
4667
4668    #[derive(Deserialize)]
4669    struct CargoPackage {
4670        repository: String,
4671    }
4672
4673    #[derive(Deserialize)]
4674    struct CargoToml {
4675        package: CargoPackage,
4676    }
4677
4678    let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
4679        let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
4680        Some((dir, json))
4681    }) else {
4682        bail!("No .cargo_vcs_info.json found in parent directories")
4683    };
4684    let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
4685    let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
4686    let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
4687    let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
4688        .context("parsing package.repository field of manifest")?;
4689    let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
4690    let permalink = provider.build_permalink(
4691        remote,
4692        BuildPermalinkParams {
4693            sha: &cargo_vcs_info.git.sha1,
4694            path: &path.to_string_lossy(),
4695            selection: Some(selection),
4696        },
4697    );
4698    Ok(permalink)
4699}
4700
4701fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
4702    let Some(blame) = blame else {
4703        return proto::BlameBufferResponse {
4704            blame_response: None,
4705        };
4706    };
4707
4708    let entries = blame
4709        .entries
4710        .into_iter()
4711        .map(|entry| proto::BlameEntry {
4712            sha: entry.sha.as_bytes().into(),
4713            start_line: entry.range.start,
4714            end_line: entry.range.end,
4715            original_line_number: entry.original_line_number,
4716            author: entry.author,
4717            author_mail: entry.author_mail,
4718            author_time: entry.author_time,
4719            author_tz: entry.author_tz,
4720            committer: entry.committer_name,
4721            committer_mail: entry.committer_email,
4722            committer_time: entry.committer_time,
4723            committer_tz: entry.committer_tz,
4724            summary: entry.summary,
4725            previous: entry.previous,
4726            filename: entry.filename,
4727        })
4728        .collect::<Vec<_>>();
4729
4730    let messages = blame
4731        .messages
4732        .into_iter()
4733        .map(|(oid, message)| proto::CommitMessage {
4734            oid: oid.as_bytes().into(),
4735            message,
4736        })
4737        .collect::<Vec<_>>();
4738
4739    proto::BlameBufferResponse {
4740        blame_response: Some(proto::blame_buffer_response::BlameResponse {
4741            entries,
4742            messages,
4743            remote_url: blame.remote_url,
4744        }),
4745    }
4746}
4747
4748fn deserialize_blame_buffer_response(
4749    response: proto::BlameBufferResponse,
4750) -> Option<git::blame::Blame> {
4751    let response = response.blame_response?;
4752    let entries = response
4753        .entries
4754        .into_iter()
4755        .filter_map(|entry| {
4756            Some(git::blame::BlameEntry {
4757                sha: git::Oid::from_bytes(&entry.sha).ok()?,
4758                range: entry.start_line..entry.end_line,
4759                original_line_number: entry.original_line_number,
4760                committer_name: entry.committer,
4761                committer_time: entry.committer_time,
4762                committer_tz: entry.committer_tz,
4763                committer_email: entry.committer_mail,
4764                author: entry.author,
4765                author_mail: entry.author_mail,
4766                author_time: entry.author_time,
4767                author_tz: entry.author_tz,
4768                summary: entry.summary,
4769                previous: entry.previous,
4770                filename: entry.filename,
4771            })
4772        })
4773        .collect::<Vec<_>>();
4774
4775    let messages = response
4776        .messages
4777        .into_iter()
4778        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
4779        .collect::<HashMap<_, _>>();
4780
4781    Some(Blame {
4782        entries,
4783        messages,
4784        remote_url: response.remote_url,
4785    })
4786}
4787
4788fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
4789    proto::Branch {
4790        is_head: branch.is_head,
4791        ref_name: branch.ref_name.to_string(),
4792        unix_timestamp: branch
4793            .most_recent_commit
4794            .as_ref()
4795            .map(|commit| commit.commit_timestamp as u64),
4796        upstream: branch.upstream.as_ref().map(|upstream| proto::GitUpstream {
4797            ref_name: upstream.ref_name.to_string(),
4798            tracking: upstream
4799                .tracking
4800                .status()
4801                .map(|upstream| proto::UpstreamTracking {
4802                    ahead: upstream.ahead as u64,
4803                    behind: upstream.behind as u64,
4804                }),
4805        }),
4806        most_recent_commit: branch
4807            .most_recent_commit
4808            .as_ref()
4809            .map(|commit| proto::CommitSummary {
4810                sha: commit.sha.to_string(),
4811                subject: commit.subject.to_string(),
4812                commit_timestamp: commit.commit_timestamp,
4813            }),
4814    }
4815}
4816
4817fn proto_to_branch(proto: &proto::Branch) -> git::repository::Branch {
4818    git::repository::Branch {
4819        is_head: proto.is_head,
4820        ref_name: proto.ref_name.clone().into(),
4821        upstream: proto
4822            .upstream
4823            .as_ref()
4824            .map(|upstream| git::repository::Upstream {
4825                ref_name: upstream.ref_name.to_string().into(),
4826                tracking: upstream
4827                    .tracking
4828                    .as_ref()
4829                    .map(|tracking| {
4830                        git::repository::UpstreamTracking::Tracked(UpstreamTrackingStatus {
4831                            ahead: tracking.ahead as u32,
4832                            behind: tracking.behind as u32,
4833                        })
4834                    })
4835                    .unwrap_or(git::repository::UpstreamTracking::Gone),
4836            }),
4837        most_recent_commit: proto.most_recent_commit.as_ref().map(|commit| {
4838            git::repository::CommitSummary {
4839                sha: commit.sha.to_string().into(),
4840                subject: commit.subject.to_string().into(),
4841                commit_timestamp: commit.commit_timestamp,
4842                has_parent: true,
4843            }
4844        }),
4845    }
4846}
4847
4848fn commit_details_to_proto(commit: &CommitDetails) -> proto::GitCommitDetails {
4849    proto::GitCommitDetails {
4850        sha: commit.sha.to_string(),
4851        message: commit.message.to_string(),
4852        commit_timestamp: commit.commit_timestamp,
4853        author_email: commit.author_email.to_string(),
4854        author_name: commit.author_name.to_string(),
4855    }
4856}
4857
4858fn proto_to_commit_details(proto: &proto::GitCommitDetails) -> CommitDetails {
4859    CommitDetails {
4860        sha: proto.sha.clone().into(),
4861        message: proto.message.clone().into(),
4862        commit_timestamp: proto.commit_timestamp,
4863        author_email: proto.author_email.clone().into(),
4864        author_name: proto.author_name.clone().into(),
4865    }
4866}
4867
4868async fn compute_snapshot(
4869    id: RepositoryId,
4870    work_directory_abs_path: Arc<Path>,
4871    prev_snapshot: RepositorySnapshot,
4872    backend: Arc<dyn GitRepository>,
4873) -> Result<(RepositorySnapshot, Vec<RepositoryEvent>)> {
4874    let mut events = Vec::new();
4875    let branches = backend.branches().await?;
4876    let branch = branches.into_iter().find(|branch| branch.is_head);
4877    let statuses = backend
4878        .status(std::slice::from_ref(&WORK_DIRECTORY_REPO_PATH))
4879        .await?;
4880    let statuses_by_path = SumTree::from_iter(
4881        statuses
4882            .entries
4883            .iter()
4884            .map(|(repo_path, status)| StatusEntry {
4885                repo_path: repo_path.clone(),
4886                status: *status,
4887            }),
4888        &(),
4889    );
4890    let (merge_details, merge_heads_changed) =
4891        MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?;
4892    log::debug!("new merge details (changed={merge_heads_changed:?}): {merge_details:?}");
4893
4894    if merge_heads_changed
4895        || branch != prev_snapshot.branch
4896        || statuses_by_path != prev_snapshot.statuses_by_path
4897    {
4898        events.push(RepositoryEvent::Updated {
4899            full_scan: true,
4900            new_instance: false,
4901        });
4902    }
4903
4904    // Cache merge conflict paths so they don't change from staging/unstaging,
4905    // until the merge heads change (at commit time, etc.).
4906    if merge_heads_changed {
4907        events.push(RepositoryEvent::MergeHeadsChanged);
4908    }
4909
4910    // Useful when branch is None in detached head state
4911    let head_commit = match backend.head_sha().await {
4912        Some(head_sha) => backend.show(head_sha).await.log_err(),
4913        None => None,
4914    };
4915
4916    // Used by edit prediction data collection
4917    let remote_origin_url = backend.remote_url("origin");
4918    let remote_upstream_url = backend.remote_url("upstream");
4919
4920    let snapshot = RepositorySnapshot {
4921        id,
4922        statuses_by_path,
4923        work_directory_abs_path,
4924        scan_id: prev_snapshot.scan_id + 1,
4925        branch,
4926        head_commit,
4927        merge: merge_details,
4928        remote_origin_url,
4929        remote_upstream_url,
4930    };
4931
4932    Ok((snapshot, events))
4933}
4934
4935fn status_from_proto(
4936    simple_status: i32,
4937    status: Option<proto::GitFileStatus>,
4938) -> anyhow::Result<FileStatus> {
4939    use proto::git_file_status::Variant;
4940
4941    let Some(variant) = status.and_then(|status| status.variant) else {
4942        let code = proto::GitStatus::from_i32(simple_status)
4943            .with_context(|| format!("Invalid git status code: {simple_status}"))?;
4944        let result = match code {
4945            proto::GitStatus::Added => TrackedStatus {
4946                worktree_status: StatusCode::Added,
4947                index_status: StatusCode::Unmodified,
4948            }
4949            .into(),
4950            proto::GitStatus::Modified => TrackedStatus {
4951                worktree_status: StatusCode::Modified,
4952                index_status: StatusCode::Unmodified,
4953            }
4954            .into(),
4955            proto::GitStatus::Conflict => UnmergedStatus {
4956                first_head: UnmergedStatusCode::Updated,
4957                second_head: UnmergedStatusCode::Updated,
4958            }
4959            .into(),
4960            proto::GitStatus::Deleted => TrackedStatus {
4961                worktree_status: StatusCode::Deleted,
4962                index_status: StatusCode::Unmodified,
4963            }
4964            .into(),
4965            _ => anyhow::bail!("Invalid code for simple status: {simple_status}"),
4966        };
4967        return Ok(result);
4968    };
4969
4970    let result = match variant {
4971        Variant::Untracked(_) => FileStatus::Untracked,
4972        Variant::Ignored(_) => FileStatus::Ignored,
4973        Variant::Unmerged(unmerged) => {
4974            let [first_head, second_head] =
4975                [unmerged.first_head, unmerged.second_head].map(|head| {
4976                    let code = proto::GitStatus::from_i32(head)
4977                        .with_context(|| format!("Invalid git status code: {head}"))?;
4978                    let result = match code {
4979                        proto::GitStatus::Added => UnmergedStatusCode::Added,
4980                        proto::GitStatus::Updated => UnmergedStatusCode::Updated,
4981                        proto::GitStatus::Deleted => UnmergedStatusCode::Deleted,
4982                        _ => anyhow::bail!("Invalid code for unmerged status: {code:?}"),
4983                    };
4984                    Ok(result)
4985                });
4986            let [first_head, second_head] = [first_head?, second_head?];
4987            UnmergedStatus {
4988                first_head,
4989                second_head,
4990            }
4991            .into()
4992        }
4993        Variant::Tracked(tracked) => {
4994            let [index_status, worktree_status] = [tracked.index_status, tracked.worktree_status]
4995                .map(|status| {
4996                    let code = proto::GitStatus::from_i32(status)
4997                        .with_context(|| format!("Invalid git status code: {status}"))?;
4998                    let result = match code {
4999                        proto::GitStatus::Modified => StatusCode::Modified,
5000                        proto::GitStatus::TypeChanged => StatusCode::TypeChanged,
5001                        proto::GitStatus::Added => StatusCode::Added,
5002                        proto::GitStatus::Deleted => StatusCode::Deleted,
5003                        proto::GitStatus::Renamed => StatusCode::Renamed,
5004                        proto::GitStatus::Copied => StatusCode::Copied,
5005                        proto::GitStatus::Unmodified => StatusCode::Unmodified,
5006                        _ => anyhow::bail!("Invalid code for tracked status: {code:?}"),
5007                    };
5008                    Ok(result)
5009                });
5010            let [index_status, worktree_status] = [index_status?, worktree_status?];
5011            TrackedStatus {
5012                index_status,
5013                worktree_status,
5014            }
5015            .into()
5016        }
5017    };
5018    Ok(result)
5019}
5020
5021fn status_to_proto(status: FileStatus) -> proto::GitFileStatus {
5022    use proto::git_file_status::{Tracked, Unmerged, Variant};
5023
5024    let variant = match status {
5025        FileStatus::Untracked => Variant::Untracked(Default::default()),
5026        FileStatus::Ignored => Variant::Ignored(Default::default()),
5027        FileStatus::Unmerged(UnmergedStatus {
5028            first_head,
5029            second_head,
5030        }) => Variant::Unmerged(Unmerged {
5031            first_head: unmerged_status_to_proto(first_head),
5032            second_head: unmerged_status_to_proto(second_head),
5033        }),
5034        FileStatus::Tracked(TrackedStatus {
5035            index_status,
5036            worktree_status,
5037        }) => Variant::Tracked(Tracked {
5038            index_status: tracked_status_to_proto(index_status),
5039            worktree_status: tracked_status_to_proto(worktree_status),
5040        }),
5041    };
5042    proto::GitFileStatus {
5043        variant: Some(variant),
5044    }
5045}
5046
5047fn unmerged_status_to_proto(code: UnmergedStatusCode) -> i32 {
5048    match code {
5049        UnmergedStatusCode::Added => proto::GitStatus::Added as _,
5050        UnmergedStatusCode::Deleted => proto::GitStatus::Deleted as _,
5051        UnmergedStatusCode::Updated => proto::GitStatus::Updated as _,
5052    }
5053}
5054
5055fn tracked_status_to_proto(code: StatusCode) -> i32 {
5056    match code {
5057        StatusCode::Added => proto::GitStatus::Added as _,
5058        StatusCode::Deleted => proto::GitStatus::Deleted as _,
5059        StatusCode::Modified => proto::GitStatus::Modified as _,
5060        StatusCode::Renamed => proto::GitStatus::Renamed as _,
5061        StatusCode::TypeChanged => proto::GitStatus::TypeChanged as _,
5062        StatusCode::Copied => proto::GitStatus::Copied as _,
5063        StatusCode::Unmodified => proto::GitStatus::Unmodified as _,
5064    }
5065}