git_store.rs

   1mod conflict_set;
   2pub mod git_traversal;
   3
   4use crate::{
   5    ProjectEnvironment, ProjectItem, ProjectPath,
   6    buffer_store::{BufferStore, BufferStoreEvent},
   7    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   8};
   9use anyhow::{Context as _, Result, anyhow, bail};
  10use askpass::AskPassDelegate;
  11use buffer_diff::{BufferDiff, BufferDiffEvent};
  12use client::ProjectId;
  13use collections::HashMap;
  14pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, ConflictSetUpdate};
  15use fs::Fs;
  16use futures::{
  17    FutureExt, StreamExt,
  18    channel::{mpsc, oneshot},
  19    future::{self, Shared},
  20    stream::FuturesOrdered,
  21};
  22use git::{
  23    BuildPermalinkParams, GitHostingProviderRegistry, WORK_DIRECTORY_REPO_PATH,
  24    blame::Blame,
  25    parse_git_remote_url,
  26    repository::{
  27        Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, DiffType, FetchOptions,
  28        GitRepository, GitRepositoryCheckpoint, PushOptions, Remote, RemoteCommandOutput, RepoPath,
  29        ResetMode, UpstreamTrackingStatus,
  30    },
  31    status::{
  32        FileStatus, GitSummary, StatusCode, TrackedStatus, UnmergedStatus, UnmergedStatusCode,
  33    },
  34};
  35use gpui::{
  36    App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
  37    WeakEntity,
  38};
  39use language::{
  40    Buffer, BufferEvent, Language, LanguageRegistry,
  41    proto::{deserialize_version, serialize_version},
  42};
  43use parking_lot::Mutex;
  44use postage::stream::Stream as _;
  45use rpc::{
  46    AnyProtoClient, TypedEnvelope,
  47    proto::{self, FromProto, SSH_PROJECT_ID, ToProto, git_reset, split_repository_update},
  48};
  49use serde::Deserialize;
  50use std::{
  51    cmp::Ordering,
  52    collections::{BTreeSet, VecDeque},
  53    future::Future,
  54    mem,
  55    ops::Range,
  56    path::{Path, PathBuf},
  57    sync::{
  58        Arc,
  59        atomic::{self, AtomicU64},
  60    },
  61    time::Instant,
  62};
  63use sum_tree::{Edit, SumTree, TreeSet};
  64use text::{Bias, BufferId};
  65use util::{ResultExt, debug_panic, post_inc};
  66use worktree::{
  67    File, PathChange, PathKey, PathProgress, PathSummary, PathTarget, ProjectEntryId,
  68    UpdatedGitRepositoriesSet, UpdatedGitRepository, Worktree,
  69};
  70
  71pub struct GitStore {
  72    state: GitStoreState,
  73    buffer_store: Entity<BufferStore>,
  74    worktree_store: Entity<WorktreeStore>,
  75    repositories: HashMap<RepositoryId, Entity<Repository>>,
  76    active_repo_id: Option<RepositoryId>,
  77    #[allow(clippy::type_complexity)]
  78    loading_diffs:
  79        HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
  80    diffs: HashMap<BufferId, Entity<BufferGitState>>,
  81    shared_diffs: HashMap<proto::PeerId, HashMap<BufferId, SharedDiffs>>,
  82    _subscriptions: Vec<Subscription>,
  83}
  84
  85#[derive(Default)]
  86struct SharedDiffs {
  87    unstaged: Option<Entity<BufferDiff>>,
  88    uncommitted: Option<Entity<BufferDiff>>,
  89}
  90
  91struct BufferGitState {
  92    unstaged_diff: Option<WeakEntity<BufferDiff>>,
  93    uncommitted_diff: Option<WeakEntity<BufferDiff>>,
  94    conflict_set: Option<WeakEntity<ConflictSet>>,
  95    recalculate_diff_task: Option<Task<Result<()>>>,
  96    reparse_conflict_markers_task: Option<Task<Result<()>>>,
  97    language: Option<Arc<Language>>,
  98    language_registry: Option<Arc<LanguageRegistry>>,
  99    conflict_updated_futures: Vec<oneshot::Sender<()>>,
 100    recalculating_tx: postage::watch::Sender<bool>,
 101
 102    /// These operation counts are used to ensure that head and index text
 103    /// values read from the git repository are up-to-date with any hunk staging
 104    /// operations that have been performed on the BufferDiff.
 105    ///
 106    /// The operation count is incremented immediately when the user initiates a
 107    /// hunk stage/unstage operation. Then, upon finishing writing the new index
 108    /// text do disk, the `operation count as of write` is updated to reflect
 109    /// the operation count that prompted the write.
 110    hunk_staging_operation_count: usize,
 111    hunk_staging_operation_count_as_of_write: usize,
 112
 113    head_text: Option<Arc<String>>,
 114    index_text: Option<Arc<String>>,
 115    head_changed: bool,
 116    index_changed: bool,
 117    language_changed: bool,
 118}
 119
 120#[derive(Clone, Debug)]
 121enum DiffBasesChange {
 122    SetIndex(Option<String>),
 123    SetHead(Option<String>),
 124    SetEach {
 125        index: Option<String>,
 126        head: Option<String>,
 127    },
 128    SetBoth(Option<String>),
 129}
 130
 131#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
 132enum DiffKind {
 133    Unstaged,
 134    Uncommitted,
 135}
 136
 137enum GitStoreState {
 138    Local {
 139        next_repository_id: Arc<AtomicU64>,
 140        downstream: Option<LocalDownstreamState>,
 141        project_environment: Entity<ProjectEnvironment>,
 142        fs: Arc<dyn Fs>,
 143    },
 144    Ssh {
 145        upstream_client: AnyProtoClient,
 146        upstream_project_id: ProjectId,
 147        downstream: Option<(AnyProtoClient, ProjectId)>,
 148    },
 149    Remote {
 150        upstream_client: AnyProtoClient,
 151        upstream_project_id: ProjectId,
 152    },
 153}
 154
 155enum DownstreamUpdate {
 156    UpdateRepository(RepositorySnapshot),
 157    RemoveRepository(RepositoryId),
 158}
 159
 160struct LocalDownstreamState {
 161    client: AnyProtoClient,
 162    project_id: ProjectId,
 163    updates_tx: mpsc::UnboundedSender<DownstreamUpdate>,
 164    _task: Task<Result<()>>,
 165}
 166
 167#[derive(Clone, Debug)]
 168pub struct GitStoreCheckpoint {
 169    checkpoints_by_work_dir_abs_path: HashMap<Arc<Path>, GitRepositoryCheckpoint>,
 170}
 171
 172#[derive(Clone, Debug, PartialEq, Eq)]
 173pub struct StatusEntry {
 174    pub repo_path: RepoPath,
 175    pub status: FileStatus,
 176}
 177
 178impl StatusEntry {
 179    fn to_proto(&self) -> proto::StatusEntry {
 180        let simple_status = match self.status {
 181            FileStatus::Ignored | FileStatus::Untracked => proto::GitStatus::Added as i32,
 182            FileStatus::Unmerged { .. } => proto::GitStatus::Conflict as i32,
 183            FileStatus::Tracked(TrackedStatus {
 184                index_status,
 185                worktree_status,
 186            }) => tracked_status_to_proto(if worktree_status != StatusCode::Unmodified {
 187                worktree_status
 188            } else {
 189                index_status
 190            }),
 191        };
 192
 193        proto::StatusEntry {
 194            repo_path: self.repo_path.as_ref().to_proto(),
 195            simple_status,
 196            status: Some(status_to_proto(self.status)),
 197        }
 198    }
 199}
 200
 201impl TryFrom<proto::StatusEntry> for StatusEntry {
 202    type Error = anyhow::Error;
 203
 204    fn try_from(value: proto::StatusEntry) -> Result<Self, Self::Error> {
 205        let repo_path = RepoPath(Arc::<Path>::from_proto(value.repo_path));
 206        let status = status_from_proto(value.simple_status, value.status)?;
 207        Ok(Self { repo_path, status })
 208    }
 209}
 210
 211impl sum_tree::Item for StatusEntry {
 212    type Summary = PathSummary<GitSummary>;
 213
 214    fn summary(&self, _: &<Self::Summary as sum_tree::Summary>::Context) -> Self::Summary {
 215        PathSummary {
 216            max_path: self.repo_path.0.clone(),
 217            item_summary: self.status.summary(),
 218        }
 219    }
 220}
 221
 222impl sum_tree::KeyedItem for StatusEntry {
 223    type Key = PathKey;
 224
 225    fn key(&self) -> Self::Key {
 226        PathKey(self.repo_path.0.clone())
 227    }
 228}
 229
 230#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
 231pub struct RepositoryId(pub u64);
 232
 233#[derive(Clone, Debug, Default, PartialEq, Eq)]
 234pub struct MergeDetails {
 235    pub conflicted_paths: TreeSet<RepoPath>,
 236    pub message: Option<SharedString>,
 237    pub heads: Vec<Option<SharedString>>,
 238}
 239
 240#[derive(Clone, Debug, PartialEq, Eq)]
 241pub struct RepositorySnapshot {
 242    pub id: RepositoryId,
 243    pub statuses_by_path: SumTree<StatusEntry>,
 244    pub work_directory_abs_path: Arc<Path>,
 245    pub branch: Option<Branch>,
 246    pub head_commit: Option<CommitDetails>,
 247    pub scan_id: u64,
 248    pub merge: MergeDetails,
 249    pub remote_origin_url: Option<String>,
 250    pub remote_upstream_url: Option<String>,
 251}
 252
 253type JobId = u64;
 254
 255#[derive(Clone, Debug, PartialEq, Eq)]
 256pub struct JobInfo {
 257    pub start: Instant,
 258    pub message: SharedString,
 259}
 260
 261pub struct Repository {
 262    this: WeakEntity<Self>,
 263    snapshot: RepositorySnapshot,
 264    commit_message_buffer: Option<Entity<Buffer>>,
 265    git_store: WeakEntity<GitStore>,
 266    // For a local repository, holds paths that have had worktree events since the last status scan completed,
 267    // and that should be examined during the next status scan.
 268    paths_needing_status_update: BTreeSet<RepoPath>,
 269    job_sender: mpsc::UnboundedSender<GitJob>,
 270    active_jobs: HashMap<JobId, JobInfo>,
 271    job_id: JobId,
 272    askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
 273    latest_askpass_id: u64,
 274}
 275
 276impl std::ops::Deref for Repository {
 277    type Target = RepositorySnapshot;
 278
 279    fn deref(&self) -> &Self::Target {
 280        &self.snapshot
 281    }
 282}
 283
 284#[derive(Clone)]
 285pub enum RepositoryState {
 286    Local {
 287        backend: Arc<dyn GitRepository>,
 288        environment: Arc<HashMap<String, String>>,
 289    },
 290    Remote {
 291        project_id: ProjectId,
 292        client: AnyProtoClient,
 293    },
 294}
 295
 296#[derive(Clone, Debug)]
 297pub enum RepositoryEvent {
 298    Updated { full_scan: bool, new_instance: bool },
 299    MergeHeadsChanged,
 300}
 301
 302#[derive(Clone, Debug)]
 303pub struct JobsUpdated;
 304
 305#[derive(Debug)]
 306pub enum GitStoreEvent {
 307    ActiveRepositoryChanged(Option<RepositoryId>),
 308    RepositoryUpdated(RepositoryId, RepositoryEvent, bool),
 309    RepositoryAdded(RepositoryId),
 310    RepositoryRemoved(RepositoryId),
 311    IndexWriteError(anyhow::Error),
 312    JobsUpdated,
 313    ConflictsUpdated,
 314}
 315
 316impl EventEmitter<RepositoryEvent> for Repository {}
 317impl EventEmitter<JobsUpdated> for Repository {}
 318impl EventEmitter<GitStoreEvent> for GitStore {}
 319
 320pub struct GitJob {
 321    job: Box<dyn FnOnce(RepositoryState, &mut AsyncApp) -> Task<()>>,
 322    key: Option<GitJobKey>,
 323}
 324
 325#[derive(PartialEq, Eq)]
 326enum GitJobKey {
 327    WriteIndex(RepoPath),
 328    ReloadBufferDiffBases,
 329    RefreshStatuses,
 330    ReloadGitState,
 331}
 332
 333impl GitStore {
 334    pub fn local(
 335        worktree_store: &Entity<WorktreeStore>,
 336        buffer_store: Entity<BufferStore>,
 337        environment: Entity<ProjectEnvironment>,
 338        fs: Arc<dyn Fs>,
 339        cx: &mut Context<Self>,
 340    ) -> Self {
 341        Self::new(
 342            worktree_store.clone(),
 343            buffer_store,
 344            GitStoreState::Local {
 345                next_repository_id: Arc::new(AtomicU64::new(1)),
 346                downstream: None,
 347                project_environment: environment,
 348                fs,
 349            },
 350            cx,
 351        )
 352    }
 353
 354    pub fn remote(
 355        worktree_store: &Entity<WorktreeStore>,
 356        buffer_store: Entity<BufferStore>,
 357        upstream_client: AnyProtoClient,
 358        project_id: ProjectId,
 359        cx: &mut Context<Self>,
 360    ) -> Self {
 361        Self::new(
 362            worktree_store.clone(),
 363            buffer_store,
 364            GitStoreState::Remote {
 365                upstream_client,
 366                upstream_project_id: project_id,
 367            },
 368            cx,
 369        )
 370    }
 371
 372    pub fn ssh(
 373        worktree_store: &Entity<WorktreeStore>,
 374        buffer_store: Entity<BufferStore>,
 375        upstream_client: AnyProtoClient,
 376        cx: &mut Context<Self>,
 377    ) -> Self {
 378        Self::new(
 379            worktree_store.clone(),
 380            buffer_store,
 381            GitStoreState::Ssh {
 382                upstream_client,
 383                upstream_project_id: ProjectId(SSH_PROJECT_ID),
 384                downstream: None,
 385            },
 386            cx,
 387        )
 388    }
 389
 390    fn new(
 391        worktree_store: Entity<WorktreeStore>,
 392        buffer_store: Entity<BufferStore>,
 393        state: GitStoreState,
 394        cx: &mut Context<Self>,
 395    ) -> Self {
 396        let _subscriptions = vec![
 397            cx.subscribe(&worktree_store, Self::on_worktree_store_event),
 398            cx.subscribe(&buffer_store, Self::on_buffer_store_event),
 399        ];
 400
 401        GitStore {
 402            state,
 403            buffer_store,
 404            worktree_store,
 405            repositories: HashMap::default(),
 406            active_repo_id: None,
 407            _subscriptions,
 408            loading_diffs: HashMap::default(),
 409            shared_diffs: HashMap::default(),
 410            diffs: HashMap::default(),
 411        }
 412    }
 413
 414    pub fn init(client: &AnyProtoClient) {
 415        client.add_entity_request_handler(Self::handle_get_remotes);
 416        client.add_entity_request_handler(Self::handle_get_branches);
 417        client.add_entity_request_handler(Self::handle_get_default_branch);
 418        client.add_entity_request_handler(Self::handle_change_branch);
 419        client.add_entity_request_handler(Self::handle_create_branch);
 420        client.add_entity_request_handler(Self::handle_git_init);
 421        client.add_entity_request_handler(Self::handle_push);
 422        client.add_entity_request_handler(Self::handle_pull);
 423        client.add_entity_request_handler(Self::handle_fetch);
 424        client.add_entity_request_handler(Self::handle_stage);
 425        client.add_entity_request_handler(Self::handle_unstage);
 426        client.add_entity_request_handler(Self::handle_stash);
 427        client.add_entity_request_handler(Self::handle_stash_pop);
 428        client.add_entity_request_handler(Self::handle_commit);
 429        client.add_entity_request_handler(Self::handle_reset);
 430        client.add_entity_request_handler(Self::handle_show);
 431        client.add_entity_request_handler(Self::handle_load_commit_diff);
 432        client.add_entity_request_handler(Self::handle_checkout_files);
 433        client.add_entity_request_handler(Self::handle_open_commit_message_buffer);
 434        client.add_entity_request_handler(Self::handle_set_index_text);
 435        client.add_entity_request_handler(Self::handle_askpass);
 436        client.add_entity_request_handler(Self::handle_check_for_pushed_commits);
 437        client.add_entity_request_handler(Self::handle_git_diff);
 438        client.add_entity_request_handler(Self::handle_open_unstaged_diff);
 439        client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
 440        client.add_entity_message_handler(Self::handle_update_diff_bases);
 441        client.add_entity_request_handler(Self::handle_get_permalink_to_line);
 442        client.add_entity_request_handler(Self::handle_blame_buffer);
 443        client.add_entity_message_handler(Self::handle_update_repository);
 444        client.add_entity_message_handler(Self::handle_remove_repository);
 445        client.add_entity_request_handler(Self::handle_git_clone);
 446    }
 447
 448    pub fn is_local(&self) -> bool {
 449        matches!(self.state, GitStoreState::Local { .. })
 450    }
 451
 452    pub fn shared(&mut self, project_id: u64, client: AnyProtoClient, cx: &mut Context<Self>) {
 453        match &mut self.state {
 454            GitStoreState::Ssh {
 455                downstream: downstream_client,
 456                ..
 457            } => {
 458                for repo in self.repositories.values() {
 459                    let update = repo.read(cx).snapshot.initial_update(project_id);
 460                    for update in split_repository_update(update) {
 461                        client.send(update).log_err();
 462                    }
 463                }
 464                *downstream_client = Some((client, ProjectId(project_id)));
 465            }
 466            GitStoreState::Local {
 467                downstream: downstream_client,
 468                ..
 469            } => {
 470                let mut snapshots = HashMap::default();
 471                let (updates_tx, mut updates_rx) = mpsc::unbounded();
 472                for repo in self.repositories.values() {
 473                    updates_tx
 474                        .unbounded_send(DownstreamUpdate::UpdateRepository(
 475                            repo.read(cx).snapshot.clone(),
 476                        ))
 477                        .ok();
 478                }
 479                *downstream_client = Some(LocalDownstreamState {
 480                    client: client.clone(),
 481                    project_id: ProjectId(project_id),
 482                    updates_tx,
 483                    _task: cx.spawn(async move |this, cx| {
 484                        cx.background_spawn(async move {
 485                            while let Some(update) = updates_rx.next().await {
 486                                match update {
 487                                    DownstreamUpdate::UpdateRepository(snapshot) => {
 488                                        if let Some(old_snapshot) = snapshots.get_mut(&snapshot.id)
 489                                        {
 490                                            let update =
 491                                                snapshot.build_update(old_snapshot, project_id);
 492                                            *old_snapshot = snapshot;
 493                                            for update in split_repository_update(update) {
 494                                                client.send(update)?;
 495                                            }
 496                                        } else {
 497                                            let update = snapshot.initial_update(project_id);
 498                                            for update in split_repository_update(update) {
 499                                                client.send(update)?;
 500                                            }
 501                                            snapshots.insert(snapshot.id, snapshot);
 502                                        }
 503                                    }
 504                                    DownstreamUpdate::RemoveRepository(id) => {
 505                                        client.send(proto::RemoveRepository {
 506                                            project_id,
 507                                            id: id.to_proto(),
 508                                        })?;
 509                                    }
 510                                }
 511                            }
 512                            anyhow::Ok(())
 513                        })
 514                        .await
 515                        .ok();
 516                        this.update(cx, |this, _| {
 517                            if let GitStoreState::Local {
 518                                downstream: downstream_client,
 519                                ..
 520                            } = &mut this.state
 521                            {
 522                                downstream_client.take();
 523                            } else {
 524                                unreachable!("unshared called on remote store");
 525                            }
 526                        })
 527                    }),
 528                });
 529            }
 530            GitStoreState::Remote { .. } => {
 531                debug_panic!("shared called on remote store");
 532            }
 533        }
 534    }
 535
 536    pub fn unshared(&mut self, _cx: &mut Context<Self>) {
 537        match &mut self.state {
 538            GitStoreState::Local {
 539                downstream: downstream_client,
 540                ..
 541            } => {
 542                downstream_client.take();
 543            }
 544            GitStoreState::Ssh {
 545                downstream: downstream_client,
 546                ..
 547            } => {
 548                downstream_client.take();
 549            }
 550            GitStoreState::Remote { .. } => {
 551                debug_panic!("unshared called on remote store");
 552            }
 553        }
 554        self.shared_diffs.clear();
 555    }
 556
 557    pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) {
 558        self.shared_diffs.remove(peer_id);
 559    }
 560
 561    pub fn active_repository(&self) -> Option<Entity<Repository>> {
 562        self.active_repo_id
 563            .as_ref()
 564            .map(|id| self.repositories[id].clone())
 565    }
 566
 567    pub fn open_unstaged_diff(
 568        &mut self,
 569        buffer: Entity<Buffer>,
 570        cx: &mut Context<Self>,
 571    ) -> Task<Result<Entity<BufferDiff>>> {
 572        let buffer_id = buffer.read(cx).remote_id();
 573        if let Some(diff_state) = self.diffs.get(&buffer_id)
 574            && let Some(unstaged_diff) = diff_state
 575                .read(cx)
 576                .unstaged_diff
 577                .as_ref()
 578                .and_then(|weak| weak.upgrade())
 579        {
 580            if let Some(task) =
 581                diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
 582            {
 583                return cx.background_executor().spawn(async move {
 584                    task.await;
 585                    Ok(unstaged_diff)
 586                });
 587            }
 588            return Task::ready(Ok(unstaged_diff));
 589        }
 590
 591        let Some((repo, repo_path)) =
 592            self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
 593        else {
 594            return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
 595        };
 596
 597        let task = self
 598            .loading_diffs
 599            .entry((buffer_id, DiffKind::Unstaged))
 600            .or_insert_with(|| {
 601                let staged_text = repo.update(cx, |repo, cx| {
 602                    repo.load_staged_text(buffer_id, repo_path, cx)
 603                });
 604                cx.spawn(async move |this, cx| {
 605                    Self::open_diff_internal(
 606                        this,
 607                        DiffKind::Unstaged,
 608                        staged_text.await.map(DiffBasesChange::SetIndex),
 609                        buffer,
 610                        cx,
 611                    )
 612                    .await
 613                    .map_err(Arc::new)
 614                })
 615                .shared()
 616            })
 617            .clone();
 618
 619        cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
 620    }
 621
 622    pub fn open_uncommitted_diff(
 623        &mut self,
 624        buffer: Entity<Buffer>,
 625        cx: &mut Context<Self>,
 626    ) -> Task<Result<Entity<BufferDiff>>> {
 627        let buffer_id = buffer.read(cx).remote_id();
 628
 629        if let Some(diff_state) = self.diffs.get(&buffer_id)
 630            && let Some(uncommitted_diff) = diff_state
 631                .read(cx)
 632                .uncommitted_diff
 633                .as_ref()
 634                .and_then(|weak| weak.upgrade())
 635        {
 636            if let Some(task) =
 637                diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
 638            {
 639                return cx.background_executor().spawn(async move {
 640                    task.await;
 641                    Ok(uncommitted_diff)
 642                });
 643            }
 644            return Task::ready(Ok(uncommitted_diff));
 645        }
 646
 647        let Some((repo, repo_path)) =
 648            self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
 649        else {
 650            return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
 651        };
 652
 653        let task = self
 654            .loading_diffs
 655            .entry((buffer_id, DiffKind::Uncommitted))
 656            .or_insert_with(|| {
 657                let changes = repo.update(cx, |repo, cx| {
 658                    repo.load_committed_text(buffer_id, repo_path, cx)
 659                });
 660
 661                cx.spawn(async move |this, cx| {
 662                    Self::open_diff_internal(this, DiffKind::Uncommitted, changes.await, buffer, cx)
 663                        .await
 664                        .map_err(Arc::new)
 665                })
 666                .shared()
 667            })
 668            .clone();
 669
 670        cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
 671    }
 672
 673    async fn open_diff_internal(
 674        this: WeakEntity<Self>,
 675        kind: DiffKind,
 676        texts: Result<DiffBasesChange>,
 677        buffer_entity: Entity<Buffer>,
 678        cx: &mut AsyncApp,
 679    ) -> Result<Entity<BufferDiff>> {
 680        let diff_bases_change = match texts {
 681            Err(e) => {
 682                this.update(cx, |this, cx| {
 683                    let buffer = buffer_entity.read(cx);
 684                    let buffer_id = buffer.remote_id();
 685                    this.loading_diffs.remove(&(buffer_id, kind));
 686                })?;
 687                return Err(e);
 688            }
 689            Ok(change) => change,
 690        };
 691
 692        this.update(cx, |this, cx| {
 693            let buffer = buffer_entity.read(cx);
 694            let buffer_id = buffer.remote_id();
 695            let language = buffer.language().cloned();
 696            let language_registry = buffer.language_registry();
 697            let text_snapshot = buffer.text_snapshot();
 698            this.loading_diffs.remove(&(buffer_id, kind));
 699
 700            let git_store = cx.weak_entity();
 701            let diff_state = this
 702                .diffs
 703                .entry(buffer_id)
 704                .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
 705
 706            let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
 707
 708            cx.subscribe(&diff, Self::on_buffer_diff_event).detach();
 709            diff_state.update(cx, |diff_state, cx| {
 710                diff_state.language = language;
 711                diff_state.language_registry = language_registry;
 712
 713                match kind {
 714                    DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
 715                    DiffKind::Uncommitted => {
 716                        let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
 717                            diff
 718                        } else {
 719                            let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
 720                            diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
 721                            unstaged_diff
 722                        };
 723
 724                        diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
 725                        diff_state.uncommitted_diff = Some(diff.downgrade())
 726                    }
 727                }
 728
 729                diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
 730                let rx = diff_state.wait_for_recalculation();
 731
 732                anyhow::Ok(async move {
 733                    if let Some(rx) = rx {
 734                        rx.await;
 735                    }
 736                    Ok(diff)
 737                })
 738            })
 739        })??
 740        .await
 741    }
 742
 743    pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
 744        let diff_state = self.diffs.get(&buffer_id)?;
 745        diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
 746    }
 747
 748    pub fn get_uncommitted_diff(
 749        &self,
 750        buffer_id: BufferId,
 751        cx: &App,
 752    ) -> Option<Entity<BufferDiff>> {
 753        let diff_state = self.diffs.get(&buffer_id)?;
 754        diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
 755    }
 756
 757    pub fn open_conflict_set(
 758        &mut self,
 759        buffer: Entity<Buffer>,
 760        cx: &mut Context<Self>,
 761    ) -> Entity<ConflictSet> {
 762        log::debug!("open conflict set");
 763        let buffer_id = buffer.read(cx).remote_id();
 764
 765        if let Some(git_state) = self.diffs.get(&buffer_id)
 766            && let Some(conflict_set) = git_state
 767                .read(cx)
 768                .conflict_set
 769                .as_ref()
 770                .and_then(|weak| weak.upgrade())
 771        {
 772            let conflict_set = conflict_set;
 773            let buffer_snapshot = buffer.read(cx).text_snapshot();
 774
 775            git_state.update(cx, |state, cx| {
 776                let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
 777            });
 778
 779            return conflict_set;
 780        }
 781
 782        let is_unmerged = self
 783            .repository_and_path_for_buffer_id(buffer_id, cx)
 784            .is_some_and(|(repo, path)| repo.read(cx).snapshot.has_conflict(&path));
 785        let git_store = cx.weak_entity();
 786        let buffer_git_state = self
 787            .diffs
 788            .entry(buffer_id)
 789            .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
 790        let conflict_set = cx.new(|cx| ConflictSet::new(buffer_id, is_unmerged, cx));
 791
 792        self._subscriptions
 793            .push(cx.subscribe(&conflict_set, |_, _, _, cx| {
 794                cx.emit(GitStoreEvent::ConflictsUpdated);
 795            }));
 796
 797        buffer_git_state.update(cx, |state, cx| {
 798            state.conflict_set = Some(conflict_set.downgrade());
 799            let buffer_snapshot = buffer.read(cx).text_snapshot();
 800            let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
 801        });
 802
 803        conflict_set
 804    }
 805
 806    pub fn project_path_git_status(
 807        &self,
 808        project_path: &ProjectPath,
 809        cx: &App,
 810    ) -> Option<FileStatus> {
 811        let (repo, repo_path) = self.repository_and_path_for_project_path(project_path, cx)?;
 812        Some(repo.read(cx).status_for_path(&repo_path)?.status)
 813    }
 814
 815    pub fn checkpoint(&self, cx: &mut App) -> Task<Result<GitStoreCheckpoint>> {
 816        let mut work_directory_abs_paths = Vec::new();
 817        let mut checkpoints = Vec::new();
 818        for repository in self.repositories.values() {
 819            repository.update(cx, |repository, _| {
 820                work_directory_abs_paths.push(repository.snapshot.work_directory_abs_path.clone());
 821                checkpoints.push(repository.checkpoint().map(|checkpoint| checkpoint?));
 822            });
 823        }
 824
 825        cx.background_executor().spawn(async move {
 826            let checkpoints = future::try_join_all(checkpoints).await?;
 827            Ok(GitStoreCheckpoint {
 828                checkpoints_by_work_dir_abs_path: work_directory_abs_paths
 829                    .into_iter()
 830                    .zip(checkpoints)
 831                    .collect(),
 832            })
 833        })
 834    }
 835
 836    pub fn restore_checkpoint(
 837        &self,
 838        checkpoint: GitStoreCheckpoint,
 839        cx: &mut App,
 840    ) -> Task<Result<()>> {
 841        let repositories_by_work_dir_abs_path = self
 842            .repositories
 843            .values()
 844            .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
 845            .collect::<HashMap<_, _>>();
 846
 847        let mut tasks = Vec::new();
 848        for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
 849            if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path) {
 850                let restore = repository.update(cx, |repository, _| {
 851                    repository.restore_checkpoint(checkpoint)
 852                });
 853                tasks.push(async move { restore.await? });
 854            }
 855        }
 856        cx.background_spawn(async move {
 857            future::try_join_all(tasks).await?;
 858            Ok(())
 859        })
 860    }
 861
 862    /// Compares two checkpoints, returning true if they are equal.
 863    pub fn compare_checkpoints(
 864        &self,
 865        left: GitStoreCheckpoint,
 866        mut right: GitStoreCheckpoint,
 867        cx: &mut App,
 868    ) -> Task<Result<bool>> {
 869        let repositories_by_work_dir_abs_path = self
 870            .repositories
 871            .values()
 872            .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
 873            .collect::<HashMap<_, _>>();
 874
 875        let mut tasks = Vec::new();
 876        for (work_dir_abs_path, left_checkpoint) in left.checkpoints_by_work_dir_abs_path {
 877            if let Some(right_checkpoint) = right
 878                .checkpoints_by_work_dir_abs_path
 879                .remove(&work_dir_abs_path)
 880            {
 881                if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path)
 882                {
 883                    let compare = repository.update(cx, |repository, _| {
 884                        repository.compare_checkpoints(left_checkpoint, right_checkpoint)
 885                    });
 886
 887                    tasks.push(async move { compare.await? });
 888                }
 889            } else {
 890                return Task::ready(Ok(false));
 891            }
 892        }
 893        cx.background_spawn(async move {
 894            Ok(future::try_join_all(tasks)
 895                .await?
 896                .into_iter()
 897                .all(|result| result))
 898        })
 899    }
 900
 901    /// Blames a buffer.
 902    pub fn blame_buffer(
 903        &self,
 904        buffer: &Entity<Buffer>,
 905        version: Option<clock::Global>,
 906        cx: &mut App,
 907    ) -> Task<Result<Option<Blame>>> {
 908        let buffer = buffer.read(cx);
 909        let Some((repo, repo_path)) =
 910            self.repository_and_path_for_buffer_id(buffer.remote_id(), cx)
 911        else {
 912            return Task::ready(Err(anyhow!("failed to find a git repository for buffer")));
 913        };
 914        let content = match &version {
 915            Some(version) => buffer.rope_for_version(version),
 916            None => buffer.as_rope().clone(),
 917        };
 918        let version = version.unwrap_or(buffer.version());
 919        let buffer_id = buffer.remote_id();
 920
 921        let rx = repo.update(cx, |repo, _| {
 922            repo.send_job(None, move |state, _| async move {
 923                match state {
 924                    RepositoryState::Local { backend, .. } => backend
 925                        .blame(repo_path.clone(), content)
 926                        .await
 927                        .with_context(|| format!("Failed to blame {:?}", repo_path.0))
 928                        .map(Some),
 929                    RepositoryState::Remote { project_id, client } => {
 930                        let response = client
 931                            .request(proto::BlameBuffer {
 932                                project_id: project_id.to_proto(),
 933                                buffer_id: buffer_id.into(),
 934                                version: serialize_version(&version),
 935                            })
 936                            .await?;
 937                        Ok(deserialize_blame_buffer_response(response))
 938                    }
 939                }
 940            })
 941        });
 942
 943        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
 944    }
 945
 946    pub fn get_permalink_to_line(
 947        &self,
 948        buffer: &Entity<Buffer>,
 949        selection: Range<u32>,
 950        cx: &mut App,
 951    ) -> Task<Result<url::Url>> {
 952        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 953            return Task::ready(Err(anyhow!("buffer has no file")));
 954        };
 955
 956        let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
 957            &(file.worktree.read(cx).id(), file.path.clone()).into(),
 958            cx,
 959        ) else {
 960            // If we're not in a Git repo, check whether this is a Rust source
 961            // file in the Cargo registry (presumably opened with go-to-definition
 962            // from a normal Rust file). If so, we can put together a permalink
 963            // using crate metadata.
 964            if buffer
 965                .read(cx)
 966                .language()
 967                .is_none_or(|lang| lang.name() != "Rust".into())
 968            {
 969                return Task::ready(Err(anyhow!("no permalink available")));
 970            }
 971            let Some(file_path) = file.worktree.read(cx).absolutize(&file.path).ok() else {
 972                return Task::ready(Err(anyhow!("no permalink available")));
 973            };
 974            return cx.spawn(async move |cx| {
 975                let provider_registry = cx.update(GitHostingProviderRegistry::default_global)?;
 976                get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
 977                    .context("no permalink available")
 978            });
 979
 980            // TODO remote case
 981        };
 982
 983        let buffer_id = buffer.read(cx).remote_id();
 984        let branch = repo.read(cx).branch.clone();
 985        let remote = branch
 986            .as_ref()
 987            .and_then(|b| b.upstream.as_ref())
 988            .and_then(|b| b.remote_name())
 989            .unwrap_or("origin")
 990            .to_string();
 991
 992        let rx = repo.update(cx, |repo, _| {
 993            repo.send_job(None, move |state, cx| async move {
 994                match state {
 995                    RepositoryState::Local { backend, .. } => {
 996                        let origin_url = backend
 997                            .remote_url(&remote)
 998                            .with_context(|| format!("remote \"{remote}\" not found"))?;
 999
1000                        let sha = backend.head_sha().await.context("reading HEAD SHA")?;
1001
1002                        let provider_registry =
1003                            cx.update(GitHostingProviderRegistry::default_global)?;
1004
1005                        let (provider, remote) =
1006                            parse_git_remote_url(provider_registry, &origin_url)
1007                                .context("parsing Git remote URL")?;
1008
1009                        let path = repo_path.to_str().with_context(|| {
1010                            format!("converting repo path {repo_path:?} to string")
1011                        })?;
1012
1013                        Ok(provider.build_permalink(
1014                            remote,
1015                            BuildPermalinkParams {
1016                                sha: &sha,
1017                                path,
1018                                selection: Some(selection),
1019                            },
1020                        ))
1021                    }
1022                    RepositoryState::Remote { project_id, client } => {
1023                        let response = client
1024                            .request(proto::GetPermalinkToLine {
1025                                project_id: project_id.to_proto(),
1026                                buffer_id: buffer_id.into(),
1027                                selection: Some(proto::Range {
1028                                    start: selection.start as u64,
1029                                    end: selection.end as u64,
1030                                }),
1031                            })
1032                            .await?;
1033
1034                        url::Url::parse(&response.permalink).context("failed to parse permalink")
1035                    }
1036                }
1037            })
1038        });
1039        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1040    }
1041
1042    fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> {
1043        match &self.state {
1044            GitStoreState::Local {
1045                downstream: downstream_client,
1046                ..
1047            } => downstream_client
1048                .as_ref()
1049                .map(|state| (state.client.clone(), state.project_id)),
1050            GitStoreState::Ssh {
1051                downstream: downstream_client,
1052                ..
1053            } => downstream_client.clone(),
1054            GitStoreState::Remote { .. } => None,
1055        }
1056    }
1057
1058    fn upstream_client(&self) -> Option<AnyProtoClient> {
1059        match &self.state {
1060            GitStoreState::Local { .. } => None,
1061            GitStoreState::Ssh {
1062                upstream_client, ..
1063            }
1064            | GitStoreState::Remote {
1065                upstream_client, ..
1066            } => Some(upstream_client.clone()),
1067        }
1068    }
1069
1070    fn on_worktree_store_event(
1071        &mut self,
1072        worktree_store: Entity<WorktreeStore>,
1073        event: &WorktreeStoreEvent,
1074        cx: &mut Context<Self>,
1075    ) {
1076        let GitStoreState::Local {
1077            project_environment,
1078            downstream,
1079            next_repository_id,
1080            fs,
1081        } = &self.state
1082        else {
1083            return;
1084        };
1085
1086        match event {
1087            WorktreeStoreEvent::WorktreeUpdatedEntries(worktree_id, updated_entries) => {
1088                if let Some(worktree) = self
1089                    .worktree_store
1090                    .read(cx)
1091                    .worktree_for_id(*worktree_id, cx)
1092                {
1093                    let paths_by_git_repo =
1094                        self.process_updated_entries(&worktree, updated_entries, cx);
1095                    let downstream = downstream
1096                        .as_ref()
1097                        .map(|downstream| downstream.updates_tx.clone());
1098                    cx.spawn(async move |_, cx| {
1099                        let paths_by_git_repo = paths_by_git_repo.await;
1100                        for (repo, paths) in paths_by_git_repo {
1101                            repo.update(cx, |repo, cx| {
1102                                repo.paths_changed(paths, downstream.clone(), cx);
1103                            })
1104                            .ok();
1105                        }
1106                    })
1107                    .detach();
1108                }
1109            }
1110            WorktreeStoreEvent::WorktreeUpdatedGitRepositories(worktree_id, changed_repos) => {
1111                let Some(worktree) = worktree_store.read(cx).worktree_for_id(*worktree_id, cx)
1112                else {
1113                    return;
1114                };
1115                if !worktree.read(cx).is_visible() {
1116                    log::debug!(
1117                        "not adding repositories for local worktree {:?} because it's not visible",
1118                        worktree.read(cx).abs_path()
1119                    );
1120                    return;
1121                }
1122                self.update_repositories_from_worktree(
1123                    project_environment.clone(),
1124                    next_repository_id.clone(),
1125                    downstream
1126                        .as_ref()
1127                        .map(|downstream| downstream.updates_tx.clone()),
1128                    changed_repos.clone(),
1129                    fs.clone(),
1130                    cx,
1131                );
1132                self.local_worktree_git_repos_changed(worktree, changed_repos, cx);
1133            }
1134            _ => {}
1135        }
1136    }
1137
1138    fn on_repository_event(
1139        &mut self,
1140        repo: Entity<Repository>,
1141        event: &RepositoryEvent,
1142        cx: &mut Context<Self>,
1143    ) {
1144        let id = repo.read(cx).id;
1145        let repo_snapshot = repo.read(cx).snapshot.clone();
1146        for (buffer_id, diff) in self.diffs.iter() {
1147            if let Some((buffer_repo, repo_path)) =
1148                self.repository_and_path_for_buffer_id(*buffer_id, cx)
1149                && buffer_repo == repo
1150            {
1151                diff.update(cx, |diff, cx| {
1152                    if let Some(conflict_set) = &diff.conflict_set {
1153                        let conflict_status_changed =
1154                            conflict_set.update(cx, |conflict_set, cx| {
1155                                let has_conflict = repo_snapshot.has_conflict(&repo_path);
1156                                conflict_set.set_has_conflict(has_conflict, cx)
1157                            })?;
1158                        if conflict_status_changed {
1159                            let buffer_store = self.buffer_store.read(cx);
1160                            if let Some(buffer) = buffer_store.get(*buffer_id) {
1161                                let _ = diff
1162                                    .reparse_conflict_markers(buffer.read(cx).text_snapshot(), cx);
1163                            }
1164                        }
1165                    }
1166                    anyhow::Ok(())
1167                })
1168                .ok();
1169            }
1170        }
1171        cx.emit(GitStoreEvent::RepositoryUpdated(
1172            id,
1173            event.clone(),
1174            self.active_repo_id == Some(id),
1175        ))
1176    }
1177
1178    fn on_jobs_updated(&mut self, _: Entity<Repository>, _: &JobsUpdated, cx: &mut Context<Self>) {
1179        cx.emit(GitStoreEvent::JobsUpdated)
1180    }
1181
1182    /// Update our list of repositories and schedule git scans in response to a notification from a worktree,
1183    fn update_repositories_from_worktree(
1184        &mut self,
1185        project_environment: Entity<ProjectEnvironment>,
1186        next_repository_id: Arc<AtomicU64>,
1187        updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
1188        updated_git_repositories: UpdatedGitRepositoriesSet,
1189        fs: Arc<dyn Fs>,
1190        cx: &mut Context<Self>,
1191    ) {
1192        let mut removed_ids = Vec::new();
1193        for update in updated_git_repositories.iter() {
1194            if let Some((id, existing)) = self.repositories.iter().find(|(_, repo)| {
1195                let existing_work_directory_abs_path =
1196                    repo.read(cx).work_directory_abs_path.clone();
1197                Some(&existing_work_directory_abs_path)
1198                    == update.old_work_directory_abs_path.as_ref()
1199                    || Some(&existing_work_directory_abs_path)
1200                        == update.new_work_directory_abs_path.as_ref()
1201            }) {
1202                if let Some(new_work_directory_abs_path) =
1203                    update.new_work_directory_abs_path.clone()
1204                {
1205                    existing.update(cx, |existing, cx| {
1206                        existing.snapshot.work_directory_abs_path = new_work_directory_abs_path;
1207                        existing.schedule_scan(updates_tx.clone(), cx);
1208                    });
1209                } else {
1210                    removed_ids.push(*id);
1211                }
1212            } else if let UpdatedGitRepository {
1213                new_work_directory_abs_path: Some(work_directory_abs_path),
1214                dot_git_abs_path: Some(dot_git_abs_path),
1215                repository_dir_abs_path: Some(repository_dir_abs_path),
1216                common_dir_abs_path: Some(common_dir_abs_path),
1217                ..
1218            } = update
1219            {
1220                let id = RepositoryId(next_repository_id.fetch_add(1, atomic::Ordering::Release));
1221                let git_store = cx.weak_entity();
1222                let repo = cx.new(|cx| {
1223                    let mut repo = Repository::local(
1224                        id,
1225                        work_directory_abs_path.clone(),
1226                        dot_git_abs_path.clone(),
1227                        repository_dir_abs_path.clone(),
1228                        common_dir_abs_path.clone(),
1229                        project_environment.downgrade(),
1230                        fs.clone(),
1231                        git_store,
1232                        cx,
1233                    );
1234                    repo.schedule_scan(updates_tx.clone(), cx);
1235                    repo
1236                });
1237                self._subscriptions
1238                    .push(cx.subscribe(&repo, Self::on_repository_event));
1239                self._subscriptions
1240                    .push(cx.subscribe(&repo, Self::on_jobs_updated));
1241                self.repositories.insert(id, repo);
1242                cx.emit(GitStoreEvent::RepositoryAdded(id));
1243                self.active_repo_id.get_or_insert_with(|| {
1244                    cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1245                    id
1246                });
1247            }
1248        }
1249
1250        for id in removed_ids {
1251            if self.active_repo_id == Some(id) {
1252                self.active_repo_id = None;
1253                cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1254            }
1255            self.repositories.remove(&id);
1256            if let Some(updates_tx) = updates_tx.as_ref() {
1257                updates_tx
1258                    .unbounded_send(DownstreamUpdate::RemoveRepository(id))
1259                    .ok();
1260            }
1261        }
1262    }
1263
1264    fn on_buffer_store_event(
1265        &mut self,
1266        _: Entity<BufferStore>,
1267        event: &BufferStoreEvent,
1268        cx: &mut Context<Self>,
1269    ) {
1270        match event {
1271            BufferStoreEvent::BufferAdded(buffer) => {
1272                cx.subscribe(buffer, |this, buffer, event, cx| {
1273                    if let BufferEvent::LanguageChanged = event {
1274                        let buffer_id = buffer.read(cx).remote_id();
1275                        if let Some(diff_state) = this.diffs.get(&buffer_id) {
1276                            diff_state.update(cx, |diff_state, cx| {
1277                                diff_state.buffer_language_changed(buffer, cx);
1278                            });
1279                        }
1280                    }
1281                })
1282                .detach();
1283            }
1284            BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => {
1285                if let Some(diffs) = self.shared_diffs.get_mut(peer_id) {
1286                    diffs.remove(buffer_id);
1287                }
1288            }
1289            BufferStoreEvent::BufferDropped(buffer_id) => {
1290                self.diffs.remove(buffer_id);
1291                for diffs in self.shared_diffs.values_mut() {
1292                    diffs.remove(buffer_id);
1293                }
1294            }
1295
1296            _ => {}
1297        }
1298    }
1299
1300    pub fn recalculate_buffer_diffs(
1301        &mut self,
1302        buffers: Vec<Entity<Buffer>>,
1303        cx: &mut Context<Self>,
1304    ) -> impl Future<Output = ()> + use<> {
1305        let mut futures = Vec::new();
1306        for buffer in buffers {
1307            if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
1308                let buffer = buffer.read(cx).text_snapshot();
1309                diff_state.update(cx, |diff_state, cx| {
1310                    diff_state.recalculate_diffs(buffer.clone(), cx);
1311                    futures.extend(diff_state.wait_for_recalculation().map(FutureExt::boxed));
1312                });
1313                futures.push(diff_state.update(cx, |diff_state, cx| {
1314                    diff_state
1315                        .reparse_conflict_markers(buffer, cx)
1316                        .map(|_| {})
1317                        .boxed()
1318                }));
1319            }
1320        }
1321        async move {
1322            futures::future::join_all(futures).await;
1323        }
1324    }
1325
1326    fn on_buffer_diff_event(
1327        &mut self,
1328        diff: Entity<buffer_diff::BufferDiff>,
1329        event: &BufferDiffEvent,
1330        cx: &mut Context<Self>,
1331    ) {
1332        if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
1333            let buffer_id = diff.read(cx).buffer_id;
1334            if let Some(diff_state) = self.diffs.get(&buffer_id) {
1335                let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
1336                    diff_state.hunk_staging_operation_count += 1;
1337                    diff_state.hunk_staging_operation_count
1338                });
1339                if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
1340                    let recv = repo.update(cx, |repo, cx| {
1341                        log::debug!("hunks changed for {}", path.display());
1342                        repo.spawn_set_index_text_job(
1343                            path,
1344                            new_index_text.as_ref().map(|rope| rope.to_string()),
1345                            Some(hunk_staging_operation_count),
1346                            cx,
1347                        )
1348                    });
1349                    let diff = diff.downgrade();
1350                    cx.spawn(async move |this, cx| {
1351                        if let Ok(Err(error)) = cx.background_spawn(recv).await {
1352                            diff.update(cx, |diff, cx| {
1353                                diff.clear_pending_hunks(cx);
1354                            })
1355                            .ok();
1356                            this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
1357                                .ok();
1358                        }
1359                    })
1360                    .detach();
1361                }
1362            }
1363        }
1364    }
1365
1366    fn local_worktree_git_repos_changed(
1367        &mut self,
1368        worktree: Entity<Worktree>,
1369        changed_repos: &UpdatedGitRepositoriesSet,
1370        cx: &mut Context<Self>,
1371    ) {
1372        log::debug!("local worktree repos changed");
1373        debug_assert!(worktree.read(cx).is_local());
1374
1375        for repository in self.repositories.values() {
1376            repository.update(cx, |repository, cx| {
1377                let repo_abs_path = &repository.work_directory_abs_path;
1378                if changed_repos.iter().any(|update| {
1379                    update.old_work_directory_abs_path.as_ref() == Some(repo_abs_path)
1380                        || update.new_work_directory_abs_path.as_ref() == Some(repo_abs_path)
1381                }) {
1382                    repository.reload_buffer_diff_bases(cx);
1383                }
1384            });
1385        }
1386    }
1387
1388    pub fn repositories(&self) -> &HashMap<RepositoryId, Entity<Repository>> {
1389        &self.repositories
1390    }
1391
1392    pub fn status_for_buffer_id(&self, buffer_id: BufferId, cx: &App) -> Option<FileStatus> {
1393        let (repo, path) = self.repository_and_path_for_buffer_id(buffer_id, cx)?;
1394        let status = repo.read(cx).snapshot.status_for_path(&path)?;
1395        Some(status.status)
1396    }
1397
1398    pub fn repository_and_path_for_buffer_id(
1399        &self,
1400        buffer_id: BufferId,
1401        cx: &App,
1402    ) -> Option<(Entity<Repository>, RepoPath)> {
1403        let buffer = self.buffer_store.read(cx).get(buffer_id)?;
1404        let project_path = buffer.read(cx).project_path(cx)?;
1405        self.repository_and_path_for_project_path(&project_path, cx)
1406    }
1407
1408    pub fn repository_and_path_for_project_path(
1409        &self,
1410        path: &ProjectPath,
1411        cx: &App,
1412    ) -> Option<(Entity<Repository>, RepoPath)> {
1413        let abs_path = self.worktree_store.read(cx).absolutize(path, cx)?;
1414        self.repositories
1415            .values()
1416            .filter_map(|repo| {
1417                let repo_path = repo.read(cx).abs_path_to_repo_path(&abs_path)?;
1418                Some((repo.clone(), repo_path))
1419            })
1420            .max_by_key(|(repo, _)| repo.read(cx).work_directory_abs_path.clone())
1421    }
1422
1423    pub fn git_init(
1424        &self,
1425        path: Arc<Path>,
1426        fallback_branch_name: String,
1427        cx: &App,
1428    ) -> Task<Result<()>> {
1429        match &self.state {
1430            GitStoreState::Local { fs, .. } => {
1431                let fs = fs.clone();
1432                cx.background_executor()
1433                    .spawn(async move { fs.git_init(&path, fallback_branch_name) })
1434            }
1435            GitStoreState::Ssh {
1436                upstream_client,
1437                upstream_project_id: project_id,
1438                ..
1439            }
1440            | GitStoreState::Remote {
1441                upstream_client,
1442                upstream_project_id: project_id,
1443                ..
1444            } => {
1445                let client = upstream_client.clone();
1446                let project_id = *project_id;
1447                cx.background_executor().spawn(async move {
1448                    client
1449                        .request(proto::GitInit {
1450                            project_id: project_id.0,
1451                            abs_path: path.to_string_lossy().to_string(),
1452                            fallback_branch_name,
1453                        })
1454                        .await?;
1455                    Ok(())
1456                })
1457            }
1458        }
1459    }
1460
1461    pub fn git_clone(
1462        &self,
1463        repo: String,
1464        path: impl Into<Arc<std::path::Path>>,
1465        cx: &App,
1466    ) -> Task<Result<()>> {
1467        let path = path.into();
1468        match &self.state {
1469            GitStoreState::Local { fs, .. } => {
1470                let fs = fs.clone();
1471                cx.background_executor()
1472                    .spawn(async move { fs.git_clone(&repo, &path).await })
1473            }
1474            GitStoreState::Ssh {
1475                upstream_client,
1476                upstream_project_id,
1477                ..
1478            } => {
1479                let request = upstream_client.request(proto::GitClone {
1480                    project_id: upstream_project_id.0,
1481                    abs_path: path.to_string_lossy().to_string(),
1482                    remote_repo: repo,
1483                });
1484
1485                cx.background_spawn(async move {
1486                    let result = request.await?;
1487
1488                    match result.success {
1489                        true => Ok(()),
1490                        false => Err(anyhow!("Git Clone failed")),
1491                    }
1492                })
1493            }
1494            GitStoreState::Remote { .. } => {
1495                Task::ready(Err(anyhow!("Git Clone isn't supported for remote users")))
1496            }
1497        }
1498    }
1499
1500    async fn handle_update_repository(
1501        this: Entity<Self>,
1502        envelope: TypedEnvelope<proto::UpdateRepository>,
1503        mut cx: AsyncApp,
1504    ) -> Result<()> {
1505        this.update(&mut cx, |this, cx| {
1506            let mut update = envelope.payload;
1507
1508            let id = RepositoryId::from_proto(update.id);
1509            let client = this.upstream_client().context("no upstream client")?;
1510
1511            let mut is_new = false;
1512            let repo = this.repositories.entry(id).or_insert_with(|| {
1513                is_new = true;
1514                let git_store = cx.weak_entity();
1515                cx.new(|cx| {
1516                    Repository::remote(
1517                        id,
1518                        Path::new(&update.abs_path).into(),
1519                        ProjectId(update.project_id),
1520                        client,
1521                        git_store,
1522                        cx,
1523                    )
1524                })
1525            });
1526            if is_new {
1527                this._subscriptions
1528                    .push(cx.subscribe(repo, Self::on_repository_event))
1529            }
1530
1531            repo.update(cx, {
1532                let update = update.clone();
1533                |repo, cx| repo.apply_remote_update(update, is_new, cx)
1534            })?;
1535
1536            this.active_repo_id.get_or_insert_with(|| {
1537                cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1538                id
1539            });
1540
1541            if let Some((client, project_id)) = this.downstream_client() {
1542                update.project_id = project_id.to_proto();
1543                client.send(update).log_err();
1544            }
1545            Ok(())
1546        })?
1547    }
1548
1549    async fn handle_remove_repository(
1550        this: Entity<Self>,
1551        envelope: TypedEnvelope<proto::RemoveRepository>,
1552        mut cx: AsyncApp,
1553    ) -> Result<()> {
1554        this.update(&mut cx, |this, cx| {
1555            let mut update = envelope.payload;
1556            let id = RepositoryId::from_proto(update.id);
1557            this.repositories.remove(&id);
1558            if let Some((client, project_id)) = this.downstream_client() {
1559                update.project_id = project_id.to_proto();
1560                client.send(update).log_err();
1561            }
1562            if this.active_repo_id == Some(id) {
1563                this.active_repo_id = None;
1564                cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1565            }
1566            cx.emit(GitStoreEvent::RepositoryRemoved(id));
1567        })
1568    }
1569
1570    async fn handle_git_init(
1571        this: Entity<Self>,
1572        envelope: TypedEnvelope<proto::GitInit>,
1573        cx: AsyncApp,
1574    ) -> Result<proto::Ack> {
1575        let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1576        let name = envelope.payload.fallback_branch_name;
1577        cx.update(|cx| this.read(cx).git_init(path, name, cx))?
1578            .await?;
1579
1580        Ok(proto::Ack {})
1581    }
1582
1583    async fn handle_git_clone(
1584        this: Entity<Self>,
1585        envelope: TypedEnvelope<proto::GitClone>,
1586        cx: AsyncApp,
1587    ) -> Result<proto::GitCloneResponse> {
1588        let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1589        let repo_name = envelope.payload.remote_repo;
1590        let result = cx
1591            .update(|cx| this.read(cx).git_clone(repo_name, path, cx))?
1592            .await;
1593
1594        Ok(proto::GitCloneResponse {
1595            success: result.is_ok(),
1596        })
1597    }
1598
1599    async fn handle_fetch(
1600        this: Entity<Self>,
1601        envelope: TypedEnvelope<proto::Fetch>,
1602        mut cx: AsyncApp,
1603    ) -> Result<proto::RemoteMessageResponse> {
1604        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1605        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1606        let fetch_options = FetchOptions::from_proto(envelope.payload.remote);
1607        let askpass_id = envelope.payload.askpass_id;
1608
1609        let askpass = make_remote_delegate(
1610            this,
1611            envelope.payload.project_id,
1612            repository_id,
1613            askpass_id,
1614            &mut cx,
1615        );
1616
1617        let remote_output = repository_handle
1618            .update(&mut cx, |repository_handle, cx| {
1619                repository_handle.fetch(fetch_options, askpass, cx)
1620            })?
1621            .await??;
1622
1623        Ok(proto::RemoteMessageResponse {
1624            stdout: remote_output.stdout,
1625            stderr: remote_output.stderr,
1626        })
1627    }
1628
1629    async fn handle_push(
1630        this: Entity<Self>,
1631        envelope: TypedEnvelope<proto::Push>,
1632        mut cx: AsyncApp,
1633    ) -> Result<proto::RemoteMessageResponse> {
1634        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1635        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1636
1637        let askpass_id = envelope.payload.askpass_id;
1638        let askpass = make_remote_delegate(
1639            this,
1640            envelope.payload.project_id,
1641            repository_id,
1642            askpass_id,
1643            &mut cx,
1644        );
1645
1646        let options = envelope
1647            .payload
1648            .options
1649            .as_ref()
1650            .map(|_| match envelope.payload.options() {
1651                proto::push::PushOptions::SetUpstream => git::repository::PushOptions::SetUpstream,
1652                proto::push::PushOptions::Force => git::repository::PushOptions::Force,
1653            });
1654
1655        let branch_name = envelope.payload.branch_name.into();
1656        let remote_name = envelope.payload.remote_name.into();
1657
1658        let remote_output = repository_handle
1659            .update(&mut cx, |repository_handle, cx| {
1660                repository_handle.push(branch_name, remote_name, options, askpass, cx)
1661            })?
1662            .await??;
1663        Ok(proto::RemoteMessageResponse {
1664            stdout: remote_output.stdout,
1665            stderr: remote_output.stderr,
1666        })
1667    }
1668
1669    async fn handle_pull(
1670        this: Entity<Self>,
1671        envelope: TypedEnvelope<proto::Pull>,
1672        mut cx: AsyncApp,
1673    ) -> Result<proto::RemoteMessageResponse> {
1674        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1675        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1676        let askpass_id = envelope.payload.askpass_id;
1677        let askpass = make_remote_delegate(
1678            this,
1679            envelope.payload.project_id,
1680            repository_id,
1681            askpass_id,
1682            &mut cx,
1683        );
1684
1685        let branch_name = envelope.payload.branch_name.into();
1686        let remote_name = envelope.payload.remote_name.into();
1687
1688        let remote_message = repository_handle
1689            .update(&mut cx, |repository_handle, cx| {
1690                repository_handle.pull(branch_name, remote_name, askpass, cx)
1691            })?
1692            .await??;
1693
1694        Ok(proto::RemoteMessageResponse {
1695            stdout: remote_message.stdout,
1696            stderr: remote_message.stderr,
1697        })
1698    }
1699
1700    async fn handle_stage(
1701        this: Entity<Self>,
1702        envelope: TypedEnvelope<proto::Stage>,
1703        mut cx: AsyncApp,
1704    ) -> Result<proto::Ack> {
1705        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1706        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1707
1708        let entries = envelope
1709            .payload
1710            .paths
1711            .into_iter()
1712            .map(PathBuf::from)
1713            .map(RepoPath::new)
1714            .collect();
1715
1716        repository_handle
1717            .update(&mut cx, |repository_handle, cx| {
1718                repository_handle.stage_entries(entries, cx)
1719            })?
1720            .await?;
1721        Ok(proto::Ack {})
1722    }
1723
1724    async fn handle_unstage(
1725        this: Entity<Self>,
1726        envelope: TypedEnvelope<proto::Unstage>,
1727        mut cx: AsyncApp,
1728    ) -> Result<proto::Ack> {
1729        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1730        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1731
1732        let entries = envelope
1733            .payload
1734            .paths
1735            .into_iter()
1736            .map(PathBuf::from)
1737            .map(RepoPath::new)
1738            .collect();
1739
1740        repository_handle
1741            .update(&mut cx, |repository_handle, cx| {
1742                repository_handle.unstage_entries(entries, cx)
1743            })?
1744            .await?;
1745
1746        Ok(proto::Ack {})
1747    }
1748
1749    async fn handle_stash(
1750        this: Entity<Self>,
1751        envelope: TypedEnvelope<proto::Stash>,
1752        mut cx: AsyncApp,
1753    ) -> Result<proto::Ack> {
1754        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1755        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1756
1757        let entries = envelope
1758            .payload
1759            .paths
1760            .into_iter()
1761            .map(PathBuf::from)
1762            .map(RepoPath::new)
1763            .collect();
1764
1765        repository_handle
1766            .update(&mut cx, |repository_handle, cx| {
1767                repository_handle.stash_entries(entries, cx)
1768            })?
1769            .await?;
1770
1771        Ok(proto::Ack {})
1772    }
1773
1774    async fn handle_stash_pop(
1775        this: Entity<Self>,
1776        envelope: TypedEnvelope<proto::StashPop>,
1777        mut cx: AsyncApp,
1778    ) -> Result<proto::Ack> {
1779        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1780        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1781
1782        repository_handle
1783            .update(&mut cx, |repository_handle, cx| {
1784                repository_handle.stash_pop(cx)
1785            })?
1786            .await?;
1787
1788        Ok(proto::Ack {})
1789    }
1790
1791    async fn handle_set_index_text(
1792        this: Entity<Self>,
1793        envelope: TypedEnvelope<proto::SetIndexText>,
1794        mut cx: AsyncApp,
1795    ) -> Result<proto::Ack> {
1796        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1797        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1798        let repo_path = RepoPath::from_str(&envelope.payload.path);
1799
1800        repository_handle
1801            .update(&mut cx, |repository_handle, cx| {
1802                repository_handle.spawn_set_index_text_job(
1803                    repo_path,
1804                    envelope.payload.text,
1805                    None,
1806                    cx,
1807                )
1808            })?
1809            .await??;
1810        Ok(proto::Ack {})
1811    }
1812
1813    async fn handle_commit(
1814        this: Entity<Self>,
1815        envelope: TypedEnvelope<proto::Commit>,
1816        mut cx: AsyncApp,
1817    ) -> Result<proto::Ack> {
1818        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1819        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1820
1821        let message = SharedString::from(envelope.payload.message);
1822        let name = envelope.payload.name.map(SharedString::from);
1823        let email = envelope.payload.email.map(SharedString::from);
1824        let options = envelope.payload.options.unwrap_or_default();
1825
1826        repository_handle
1827            .update(&mut cx, |repository_handle, cx| {
1828                repository_handle.commit(
1829                    message,
1830                    name.zip(email),
1831                    CommitOptions {
1832                        amend: options.amend,
1833                        signoff: options.signoff,
1834                    },
1835                    cx,
1836                )
1837            })?
1838            .await??;
1839        Ok(proto::Ack {})
1840    }
1841
1842    async fn handle_get_remotes(
1843        this: Entity<Self>,
1844        envelope: TypedEnvelope<proto::GetRemotes>,
1845        mut cx: AsyncApp,
1846    ) -> Result<proto::GetRemotesResponse> {
1847        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1848        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1849
1850        let branch_name = envelope.payload.branch_name;
1851
1852        let remotes = repository_handle
1853            .update(&mut cx, |repository_handle, _| {
1854                repository_handle.get_remotes(branch_name)
1855            })?
1856            .await??;
1857
1858        Ok(proto::GetRemotesResponse {
1859            remotes: remotes
1860                .into_iter()
1861                .map(|remotes| proto::get_remotes_response::Remote {
1862                    name: remotes.name.to_string(),
1863                })
1864                .collect::<Vec<_>>(),
1865        })
1866    }
1867
1868    async fn handle_get_branches(
1869        this: Entity<Self>,
1870        envelope: TypedEnvelope<proto::GitGetBranches>,
1871        mut cx: AsyncApp,
1872    ) -> Result<proto::GitBranchesResponse> {
1873        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1874        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1875
1876        let branches = repository_handle
1877            .update(&mut cx, |repository_handle, _| repository_handle.branches())?
1878            .await??;
1879
1880        Ok(proto::GitBranchesResponse {
1881            branches: branches
1882                .into_iter()
1883                .map(|branch| branch_to_proto(&branch))
1884                .collect::<Vec<_>>(),
1885        })
1886    }
1887    async fn handle_get_default_branch(
1888        this: Entity<Self>,
1889        envelope: TypedEnvelope<proto::GetDefaultBranch>,
1890        mut cx: AsyncApp,
1891    ) -> Result<proto::GetDefaultBranchResponse> {
1892        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1893        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1894
1895        let branch = repository_handle
1896            .update(&mut cx, |repository_handle, _| {
1897                repository_handle.default_branch()
1898            })?
1899            .await??
1900            .map(Into::into);
1901
1902        Ok(proto::GetDefaultBranchResponse { branch })
1903    }
1904    async fn handle_create_branch(
1905        this: Entity<Self>,
1906        envelope: TypedEnvelope<proto::GitCreateBranch>,
1907        mut cx: AsyncApp,
1908    ) -> Result<proto::Ack> {
1909        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1910        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1911        let branch_name = envelope.payload.branch_name;
1912
1913        repository_handle
1914            .update(&mut cx, |repository_handle, _| {
1915                repository_handle.create_branch(branch_name)
1916            })?
1917            .await??;
1918
1919        Ok(proto::Ack {})
1920    }
1921
1922    async fn handle_change_branch(
1923        this: Entity<Self>,
1924        envelope: TypedEnvelope<proto::GitChangeBranch>,
1925        mut cx: AsyncApp,
1926    ) -> Result<proto::Ack> {
1927        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1928        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1929        let branch_name = envelope.payload.branch_name;
1930
1931        repository_handle
1932            .update(&mut cx, |repository_handle, _| {
1933                repository_handle.change_branch(branch_name)
1934            })?
1935            .await??;
1936
1937        Ok(proto::Ack {})
1938    }
1939
1940    async fn handle_show(
1941        this: Entity<Self>,
1942        envelope: TypedEnvelope<proto::GitShow>,
1943        mut cx: AsyncApp,
1944    ) -> Result<proto::GitCommitDetails> {
1945        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1946        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1947
1948        let commit = repository_handle
1949            .update(&mut cx, |repository_handle, _| {
1950                repository_handle.show(envelope.payload.commit)
1951            })?
1952            .await??;
1953        Ok(proto::GitCommitDetails {
1954            sha: commit.sha.into(),
1955            message: commit.message.into(),
1956            commit_timestamp: commit.commit_timestamp,
1957            author_email: commit.author_email.into(),
1958            author_name: commit.author_name.into(),
1959        })
1960    }
1961
1962    async fn handle_load_commit_diff(
1963        this: Entity<Self>,
1964        envelope: TypedEnvelope<proto::LoadCommitDiff>,
1965        mut cx: AsyncApp,
1966    ) -> Result<proto::LoadCommitDiffResponse> {
1967        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1968        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1969
1970        let commit_diff = repository_handle
1971            .update(&mut cx, |repository_handle, _| {
1972                repository_handle.load_commit_diff(envelope.payload.commit)
1973            })?
1974            .await??;
1975        Ok(proto::LoadCommitDiffResponse {
1976            files: commit_diff
1977                .files
1978                .into_iter()
1979                .map(|file| proto::CommitFile {
1980                    path: file.path.to_string(),
1981                    old_text: file.old_text,
1982                    new_text: file.new_text,
1983                })
1984                .collect(),
1985        })
1986    }
1987
1988    async fn handle_reset(
1989        this: Entity<Self>,
1990        envelope: TypedEnvelope<proto::GitReset>,
1991        mut cx: AsyncApp,
1992    ) -> Result<proto::Ack> {
1993        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1994        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1995
1996        let mode = match envelope.payload.mode() {
1997            git_reset::ResetMode::Soft => ResetMode::Soft,
1998            git_reset::ResetMode::Mixed => ResetMode::Mixed,
1999        };
2000
2001        repository_handle
2002            .update(&mut cx, |repository_handle, cx| {
2003                repository_handle.reset(envelope.payload.commit, mode, cx)
2004            })?
2005            .await??;
2006        Ok(proto::Ack {})
2007    }
2008
2009    async fn handle_checkout_files(
2010        this: Entity<Self>,
2011        envelope: TypedEnvelope<proto::GitCheckoutFiles>,
2012        mut cx: AsyncApp,
2013    ) -> Result<proto::Ack> {
2014        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2015        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2016        let paths = envelope
2017            .payload
2018            .paths
2019            .iter()
2020            .map(|s| RepoPath::from_str(s))
2021            .collect();
2022
2023        repository_handle
2024            .update(&mut cx, |repository_handle, cx| {
2025                repository_handle.checkout_files(&envelope.payload.commit, paths, cx)
2026            })?
2027            .await??;
2028        Ok(proto::Ack {})
2029    }
2030
2031    async fn handle_open_commit_message_buffer(
2032        this: Entity<Self>,
2033        envelope: TypedEnvelope<proto::OpenCommitMessageBuffer>,
2034        mut cx: AsyncApp,
2035    ) -> Result<proto::OpenBufferResponse> {
2036        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2037        let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
2038        let buffer = repository
2039            .update(&mut cx, |repository, cx| {
2040                repository.open_commit_buffer(None, this.read(cx).buffer_store.clone(), cx)
2041            })?
2042            .await?;
2043
2044        let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id())?;
2045        this.update(&mut cx, |this, cx| {
2046            this.buffer_store.update(cx, |buffer_store, cx| {
2047                buffer_store
2048                    .create_buffer_for_peer(
2049                        &buffer,
2050                        envelope.original_sender_id.unwrap_or(envelope.sender_id),
2051                        cx,
2052                    )
2053                    .detach_and_log_err(cx);
2054            })
2055        })?;
2056
2057        Ok(proto::OpenBufferResponse {
2058            buffer_id: buffer_id.to_proto(),
2059        })
2060    }
2061
2062    async fn handle_askpass(
2063        this: Entity<Self>,
2064        envelope: TypedEnvelope<proto::AskPassRequest>,
2065        mut cx: AsyncApp,
2066    ) -> Result<proto::AskPassResponse> {
2067        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2068        let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
2069
2070        let delegates = cx.update(|cx| repository.read(cx).askpass_delegates.clone())?;
2071        let Some(mut askpass) = delegates.lock().remove(&envelope.payload.askpass_id) else {
2072            debug_panic!("no askpass found");
2073            anyhow::bail!("no askpass found");
2074        };
2075
2076        let response = askpass.ask_password(envelope.payload.prompt).await?;
2077
2078        delegates
2079            .lock()
2080            .insert(envelope.payload.askpass_id, askpass);
2081
2082        Ok(proto::AskPassResponse { response })
2083    }
2084
2085    async fn handle_check_for_pushed_commits(
2086        this: Entity<Self>,
2087        envelope: TypedEnvelope<proto::CheckForPushedCommits>,
2088        mut cx: AsyncApp,
2089    ) -> Result<proto::CheckForPushedCommitsResponse> {
2090        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2091        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2092
2093        let branches = repository_handle
2094            .update(&mut cx, |repository_handle, _| {
2095                repository_handle.check_for_pushed_commits()
2096            })?
2097            .await??;
2098        Ok(proto::CheckForPushedCommitsResponse {
2099            pushed_to: branches
2100                .into_iter()
2101                .map(|commit| commit.to_string())
2102                .collect(),
2103        })
2104    }
2105
2106    async fn handle_git_diff(
2107        this: Entity<Self>,
2108        envelope: TypedEnvelope<proto::GitDiff>,
2109        mut cx: AsyncApp,
2110    ) -> Result<proto::GitDiffResponse> {
2111        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2112        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2113        let diff_type = match envelope.payload.diff_type() {
2114            proto::git_diff::DiffType::HeadToIndex => DiffType::HeadToIndex,
2115            proto::git_diff::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
2116        };
2117
2118        let mut diff = repository_handle
2119            .update(&mut cx, |repository_handle, cx| {
2120                repository_handle.diff(diff_type, cx)
2121            })?
2122            .await??;
2123        const ONE_MB: usize = 1_000_000;
2124        if diff.len() > ONE_MB {
2125            diff = diff.chars().take(ONE_MB).collect()
2126        }
2127
2128        Ok(proto::GitDiffResponse { diff })
2129    }
2130
2131    async fn handle_open_unstaged_diff(
2132        this: Entity<Self>,
2133        request: TypedEnvelope<proto::OpenUnstagedDiff>,
2134        mut cx: AsyncApp,
2135    ) -> Result<proto::OpenUnstagedDiffResponse> {
2136        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2137        let diff = this
2138            .update(&mut cx, |this, cx| {
2139                let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2140                Some(this.open_unstaged_diff(buffer, cx))
2141            })?
2142            .context("missing buffer")?
2143            .await?;
2144        this.update(&mut cx, |this, _| {
2145            let shared_diffs = this
2146                .shared_diffs
2147                .entry(request.original_sender_id.unwrap_or(request.sender_id))
2148                .or_default();
2149            shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone());
2150        })?;
2151        let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?;
2152        Ok(proto::OpenUnstagedDiffResponse { staged_text })
2153    }
2154
2155    async fn handle_open_uncommitted_diff(
2156        this: Entity<Self>,
2157        request: TypedEnvelope<proto::OpenUncommittedDiff>,
2158        mut cx: AsyncApp,
2159    ) -> Result<proto::OpenUncommittedDiffResponse> {
2160        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2161        let diff = this
2162            .update(&mut cx, |this, cx| {
2163                let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2164                Some(this.open_uncommitted_diff(buffer, cx))
2165            })?
2166            .context("missing buffer")?
2167            .await?;
2168        this.update(&mut cx, |this, _| {
2169            let shared_diffs = this
2170                .shared_diffs
2171                .entry(request.original_sender_id.unwrap_or(request.sender_id))
2172                .or_default();
2173            shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone());
2174        })?;
2175        diff.read_with(&cx, |diff, cx| {
2176            use proto::open_uncommitted_diff_response::Mode;
2177
2178            let unstaged_diff = diff.secondary_diff();
2179            let index_snapshot = unstaged_diff.and_then(|diff| {
2180                let diff = diff.read(cx);
2181                diff.base_text_exists().then(|| diff.base_text())
2182            });
2183
2184            let mode;
2185            let staged_text;
2186            let committed_text;
2187            if diff.base_text_exists() {
2188                let committed_snapshot = diff.base_text();
2189                committed_text = Some(committed_snapshot.text());
2190                if let Some(index_text) = index_snapshot {
2191                    if index_text.remote_id() == committed_snapshot.remote_id() {
2192                        mode = Mode::IndexMatchesHead;
2193                        staged_text = None;
2194                    } else {
2195                        mode = Mode::IndexAndHead;
2196                        staged_text = Some(index_text.text());
2197                    }
2198                } else {
2199                    mode = Mode::IndexAndHead;
2200                    staged_text = None;
2201                }
2202            } else {
2203                mode = Mode::IndexAndHead;
2204                committed_text = None;
2205                staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2206            }
2207
2208            proto::OpenUncommittedDiffResponse {
2209                committed_text,
2210                staged_text,
2211                mode: mode.into(),
2212            }
2213        })
2214    }
2215
2216    async fn handle_update_diff_bases(
2217        this: Entity<Self>,
2218        request: TypedEnvelope<proto::UpdateDiffBases>,
2219        mut cx: AsyncApp,
2220    ) -> Result<()> {
2221        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2222        this.update(&mut cx, |this, cx| {
2223            if let Some(diff_state) = this.diffs.get_mut(&buffer_id)
2224                && let Some(buffer) = this.buffer_store.read(cx).get(buffer_id)
2225            {
2226                let buffer = buffer.read(cx).text_snapshot();
2227                diff_state.update(cx, |diff_state, cx| {
2228                    diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2229                })
2230            }
2231        })
2232    }
2233
2234    async fn handle_blame_buffer(
2235        this: Entity<Self>,
2236        envelope: TypedEnvelope<proto::BlameBuffer>,
2237        mut cx: AsyncApp,
2238    ) -> Result<proto::BlameBufferResponse> {
2239        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2240        let version = deserialize_version(&envelope.payload.version);
2241        let buffer = this.read_with(&cx, |this, cx| {
2242            this.buffer_store.read(cx).get_existing(buffer_id)
2243        })??;
2244        buffer
2245            .update(&mut cx, |buffer, _| {
2246                buffer.wait_for_version(version.clone())
2247            })?
2248            .await?;
2249        let blame = this
2250            .update(&mut cx, |this, cx| {
2251                this.blame_buffer(&buffer, Some(version), cx)
2252            })?
2253            .await?;
2254        Ok(serialize_blame_buffer_response(blame))
2255    }
2256
2257    async fn handle_get_permalink_to_line(
2258        this: Entity<Self>,
2259        envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2260        mut cx: AsyncApp,
2261    ) -> Result<proto::GetPermalinkToLineResponse> {
2262        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2263        // let version = deserialize_version(&envelope.payload.version);
2264        let selection = {
2265            let proto_selection = envelope
2266                .payload
2267                .selection
2268                .context("no selection to get permalink for defined")?;
2269            proto_selection.start as u32..proto_selection.end as u32
2270        };
2271        let buffer = this.read_with(&cx, |this, cx| {
2272            this.buffer_store.read(cx).get_existing(buffer_id)
2273        })??;
2274        let permalink = this
2275            .update(&mut cx, |this, cx| {
2276                this.get_permalink_to_line(&buffer, selection, cx)
2277            })?
2278            .await?;
2279        Ok(proto::GetPermalinkToLineResponse {
2280            permalink: permalink.to_string(),
2281        })
2282    }
2283
2284    fn repository_for_request(
2285        this: &Entity<Self>,
2286        id: RepositoryId,
2287        cx: &mut AsyncApp,
2288    ) -> Result<Entity<Repository>> {
2289        this.read_with(cx, |this, _| {
2290            this.repositories
2291                .get(&id)
2292                .context("missing repository handle")
2293                .cloned()
2294        })?
2295    }
2296
2297    pub fn repo_snapshots(&self, cx: &App) -> HashMap<RepositoryId, RepositorySnapshot> {
2298        self.repositories
2299            .iter()
2300            .map(|(id, repo)| (*id, repo.read(cx).snapshot.clone()))
2301            .collect()
2302    }
2303
2304    fn process_updated_entries(
2305        &self,
2306        worktree: &Entity<Worktree>,
2307        updated_entries: &[(Arc<Path>, ProjectEntryId, PathChange)],
2308        cx: &mut App,
2309    ) -> Task<HashMap<Entity<Repository>, Vec<RepoPath>>> {
2310        let mut repo_paths = self
2311            .repositories
2312            .values()
2313            .map(|repo| (repo.read(cx).work_directory_abs_path.clone(), repo.clone()))
2314            .collect::<Vec<_>>();
2315        let mut entries: Vec<_> = updated_entries
2316            .iter()
2317            .map(|(path, _, _)| path.clone())
2318            .collect();
2319        entries.sort();
2320        let worktree = worktree.read(cx);
2321
2322        let entries = entries
2323            .into_iter()
2324            .filter_map(|path| worktree.absolutize(&path).ok())
2325            .collect::<Arc<[_]>>();
2326
2327        let executor = cx.background_executor().clone();
2328        cx.background_executor().spawn(async move {
2329            repo_paths.sort_by(|lhs, rhs| lhs.0.cmp(&rhs.0));
2330            let mut paths_by_git_repo = HashMap::<_, Vec<_>>::default();
2331            let mut tasks = FuturesOrdered::new();
2332            for (repo_path, repo) in repo_paths.into_iter().rev() {
2333                let entries = entries.clone();
2334                let task = executor.spawn(async move {
2335                    // Find all repository paths that belong to this repo
2336                    let mut ix = entries.partition_point(|path| path < &*repo_path);
2337                    if ix == entries.len() {
2338                        return None;
2339                    };
2340
2341                    let mut paths = Vec::new();
2342                    // All paths prefixed by a given repo will constitute a continuous range.
2343                    while let Some(path) = entries.get(ix)
2344                        && let Some(repo_path) =
2345                            RepositorySnapshot::abs_path_to_repo_path_inner(&repo_path, path)
2346                    {
2347                        paths.push((repo_path, ix));
2348                        ix += 1;
2349                    }
2350                    if paths.is_empty() {
2351                        None
2352                    } else {
2353                        Some((repo, paths))
2354                    }
2355                });
2356                tasks.push_back(task);
2357            }
2358
2359            // Now, let's filter out the "duplicate" entries that were processed by multiple distinct repos.
2360            let mut path_was_used = vec![false; entries.len()];
2361            let tasks = tasks.collect::<Vec<_>>().await;
2362            // Process tasks from the back: iterating backwards allows us to see more-specific paths first.
2363            // We always want to assign a path to it's innermost repository.
2364            for t in tasks {
2365                let Some((repo, paths)) = t else {
2366                    continue;
2367                };
2368                let entry = paths_by_git_repo.entry(repo).or_default();
2369                for (repo_path, ix) in paths {
2370                    if path_was_used[ix] {
2371                        continue;
2372                    }
2373                    path_was_used[ix] = true;
2374                    entry.push(repo_path);
2375                }
2376            }
2377
2378            paths_by_git_repo
2379        })
2380    }
2381}
2382
2383impl BufferGitState {
2384    fn new(_git_store: WeakEntity<GitStore>) -> Self {
2385        Self {
2386            unstaged_diff: Default::default(),
2387            uncommitted_diff: Default::default(),
2388            recalculate_diff_task: Default::default(),
2389            language: Default::default(),
2390            language_registry: Default::default(),
2391            recalculating_tx: postage::watch::channel_with(false).0,
2392            hunk_staging_operation_count: 0,
2393            hunk_staging_operation_count_as_of_write: 0,
2394            head_text: Default::default(),
2395            index_text: Default::default(),
2396            head_changed: Default::default(),
2397            index_changed: Default::default(),
2398            language_changed: Default::default(),
2399            conflict_updated_futures: Default::default(),
2400            conflict_set: Default::default(),
2401            reparse_conflict_markers_task: Default::default(),
2402        }
2403    }
2404
2405    fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
2406        self.language = buffer.read(cx).language().cloned();
2407        self.language_changed = true;
2408        let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
2409    }
2410
2411    fn reparse_conflict_markers(
2412        &mut self,
2413        buffer: text::BufferSnapshot,
2414        cx: &mut Context<Self>,
2415    ) -> oneshot::Receiver<()> {
2416        let (tx, rx) = oneshot::channel();
2417
2418        let Some(conflict_set) = self
2419            .conflict_set
2420            .as_ref()
2421            .and_then(|conflict_set| conflict_set.upgrade())
2422        else {
2423            return rx;
2424        };
2425
2426        let old_snapshot = conflict_set.read_with(cx, |conflict_set, _| {
2427            if conflict_set.has_conflict {
2428                Some(conflict_set.snapshot())
2429            } else {
2430                None
2431            }
2432        });
2433
2434        if let Some(old_snapshot) = old_snapshot {
2435            self.conflict_updated_futures.push(tx);
2436            self.reparse_conflict_markers_task = Some(cx.spawn(async move |this, cx| {
2437                let (snapshot, changed_range) = cx
2438                    .background_spawn(async move {
2439                        let new_snapshot = ConflictSet::parse(&buffer);
2440                        let changed_range = old_snapshot.compare(&new_snapshot, &buffer);
2441                        (new_snapshot, changed_range)
2442                    })
2443                    .await;
2444                this.update(cx, |this, cx| {
2445                    if let Some(conflict_set) = &this.conflict_set {
2446                        conflict_set
2447                            .update(cx, |conflict_set, cx| {
2448                                conflict_set.set_snapshot(snapshot, changed_range, cx);
2449                            })
2450                            .ok();
2451                    }
2452                    let futures = std::mem::take(&mut this.conflict_updated_futures);
2453                    for tx in futures {
2454                        tx.send(()).ok();
2455                    }
2456                })
2457            }))
2458        }
2459
2460        rx
2461    }
2462
2463    fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
2464        self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
2465    }
2466
2467    fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
2468        self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
2469    }
2470
2471    fn handle_base_texts_updated(
2472        &mut self,
2473        buffer: text::BufferSnapshot,
2474        message: proto::UpdateDiffBases,
2475        cx: &mut Context<Self>,
2476    ) {
2477        use proto::update_diff_bases::Mode;
2478
2479        let Some(mode) = Mode::from_i32(message.mode) else {
2480            return;
2481        };
2482
2483        let diff_bases_change = match mode {
2484            Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
2485            Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
2486            Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
2487            Mode::IndexAndHead => DiffBasesChange::SetEach {
2488                index: message.staged_text,
2489                head: message.committed_text,
2490            },
2491        };
2492
2493        self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
2494    }
2495
2496    pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
2497        if *self.recalculating_tx.borrow() {
2498            let mut rx = self.recalculating_tx.subscribe();
2499            Some(async move {
2500                loop {
2501                    let is_recalculating = rx.recv().await;
2502                    if is_recalculating != Some(true) {
2503                        break;
2504                    }
2505                }
2506            })
2507        } else {
2508            None
2509        }
2510    }
2511
2512    fn diff_bases_changed(
2513        &mut self,
2514        buffer: text::BufferSnapshot,
2515        diff_bases_change: Option<DiffBasesChange>,
2516        cx: &mut Context<Self>,
2517    ) {
2518        match diff_bases_change {
2519            Some(DiffBasesChange::SetIndex(index)) => {
2520                self.index_text = index.map(|mut index| {
2521                    text::LineEnding::normalize(&mut index);
2522                    Arc::new(index)
2523                });
2524                self.index_changed = true;
2525            }
2526            Some(DiffBasesChange::SetHead(head)) => {
2527                self.head_text = head.map(|mut head| {
2528                    text::LineEnding::normalize(&mut head);
2529                    Arc::new(head)
2530                });
2531                self.head_changed = true;
2532            }
2533            Some(DiffBasesChange::SetBoth(text)) => {
2534                let text = text.map(|mut text| {
2535                    text::LineEnding::normalize(&mut text);
2536                    Arc::new(text)
2537                });
2538                self.head_text = text.clone();
2539                self.index_text = text;
2540                self.head_changed = true;
2541                self.index_changed = true;
2542            }
2543            Some(DiffBasesChange::SetEach { index, head }) => {
2544                self.index_text = index.map(|mut index| {
2545                    text::LineEnding::normalize(&mut index);
2546                    Arc::new(index)
2547                });
2548                self.index_changed = true;
2549                self.head_text = head.map(|mut head| {
2550                    text::LineEnding::normalize(&mut head);
2551                    Arc::new(head)
2552                });
2553                self.head_changed = true;
2554            }
2555            None => {}
2556        }
2557
2558        self.recalculate_diffs(buffer, cx)
2559    }
2560
2561    fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
2562        *self.recalculating_tx.borrow_mut() = true;
2563
2564        let language = self.language.clone();
2565        let language_registry = self.language_registry.clone();
2566        let unstaged_diff = self.unstaged_diff();
2567        let uncommitted_diff = self.uncommitted_diff();
2568        let head = self.head_text.clone();
2569        let index = self.index_text.clone();
2570        let index_changed = self.index_changed;
2571        let head_changed = self.head_changed;
2572        let language_changed = self.language_changed;
2573        let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_write;
2574        let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
2575            (Some(index), Some(head)) => Arc::ptr_eq(index, head),
2576            (None, None) => true,
2577            _ => false,
2578        };
2579        self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
2580            log::debug!(
2581                "start recalculating diffs for buffer {}",
2582                buffer.remote_id()
2583            );
2584
2585            let mut new_unstaged_diff = None;
2586            if let Some(unstaged_diff) = &unstaged_diff {
2587                new_unstaged_diff = Some(
2588                    BufferDiff::update_diff(
2589                        unstaged_diff.clone(),
2590                        buffer.clone(),
2591                        index,
2592                        index_changed,
2593                        language_changed,
2594                        language.clone(),
2595                        language_registry.clone(),
2596                        cx,
2597                    )
2598                    .await?,
2599                );
2600            }
2601
2602            let mut new_uncommitted_diff = None;
2603            if let Some(uncommitted_diff) = &uncommitted_diff {
2604                new_uncommitted_diff = if index_matches_head {
2605                    new_unstaged_diff.clone()
2606                } else {
2607                    Some(
2608                        BufferDiff::update_diff(
2609                            uncommitted_diff.clone(),
2610                            buffer.clone(),
2611                            head,
2612                            head_changed,
2613                            language_changed,
2614                            language.clone(),
2615                            language_registry.clone(),
2616                            cx,
2617                        )
2618                        .await?,
2619                    )
2620                }
2621            }
2622
2623            let cancel = this.update(cx, |this, _| {
2624                // This checks whether all pending stage/unstage operations
2625                // have quiesced (i.e. both the corresponding write and the
2626                // read of that write have completed). If not, then we cancel
2627                // this recalculation attempt to avoid invalidating pending
2628                // state too quickly; another recalculation will come along
2629                // later and clear the pending state once the state of the index has settled.
2630                if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
2631                    *this.recalculating_tx.borrow_mut() = false;
2632                    true
2633                } else {
2634                    false
2635                }
2636            })?;
2637            if cancel {
2638                log::debug!(
2639                    concat!(
2640                        "aborting recalculating diffs for buffer {}",
2641                        "due to subsequent hunk operations",
2642                    ),
2643                    buffer.remote_id()
2644                );
2645                return Ok(());
2646            }
2647
2648            let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
2649                unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
2650            {
2651                unstaged_diff.update(cx, |diff, cx| {
2652                    if language_changed {
2653                        diff.language_changed(cx);
2654                    }
2655                    diff.set_snapshot(new_unstaged_diff, &buffer, cx)
2656                })?
2657            } else {
2658                None
2659            };
2660
2661            if let Some((uncommitted_diff, new_uncommitted_diff)) =
2662                uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
2663            {
2664                uncommitted_diff.update(cx, |diff, cx| {
2665                    if language_changed {
2666                        diff.language_changed(cx);
2667                    }
2668                    diff.set_snapshot_with_secondary(
2669                        new_uncommitted_diff,
2670                        &buffer,
2671                        unstaged_changed_range,
2672                        true,
2673                        cx,
2674                    );
2675                })?;
2676            }
2677
2678            log::debug!(
2679                "finished recalculating diffs for buffer {}",
2680                buffer.remote_id()
2681            );
2682
2683            if let Some(this) = this.upgrade() {
2684                this.update(cx, |this, _| {
2685                    this.index_changed = false;
2686                    this.head_changed = false;
2687                    this.language_changed = false;
2688                    *this.recalculating_tx.borrow_mut() = false;
2689                })?;
2690            }
2691
2692            Ok(())
2693        }));
2694    }
2695}
2696
2697fn make_remote_delegate(
2698    this: Entity<GitStore>,
2699    project_id: u64,
2700    repository_id: RepositoryId,
2701    askpass_id: u64,
2702    cx: &mut AsyncApp,
2703) -> AskPassDelegate {
2704    AskPassDelegate::new(cx, move |prompt, tx, cx| {
2705        this.update(cx, |this, cx| {
2706            let Some((client, _)) = this.downstream_client() else {
2707                return;
2708            };
2709            let response = client.request(proto::AskPassRequest {
2710                project_id,
2711                repository_id: repository_id.to_proto(),
2712                askpass_id,
2713                prompt,
2714            });
2715            cx.spawn(async move |_, _| {
2716                tx.send(response.await?.response).ok();
2717                anyhow::Ok(())
2718            })
2719            .detach_and_log_err(cx);
2720        })
2721        .log_err();
2722    })
2723}
2724
2725impl RepositoryId {
2726    pub fn to_proto(self) -> u64 {
2727        self.0
2728    }
2729
2730    pub fn from_proto(id: u64) -> Self {
2731        RepositoryId(id)
2732    }
2733}
2734
2735impl RepositorySnapshot {
2736    fn empty(id: RepositoryId, work_directory_abs_path: Arc<Path>) -> Self {
2737        Self {
2738            id,
2739            statuses_by_path: Default::default(),
2740            work_directory_abs_path,
2741            branch: None,
2742            head_commit: None,
2743            scan_id: 0,
2744            merge: Default::default(),
2745            remote_origin_url: None,
2746            remote_upstream_url: None,
2747        }
2748    }
2749
2750    fn initial_update(&self, project_id: u64) -> proto::UpdateRepository {
2751        proto::UpdateRepository {
2752            branch_summary: self.branch.as_ref().map(branch_to_proto),
2753            head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2754            updated_statuses: self
2755                .statuses_by_path
2756                .iter()
2757                .map(|entry| entry.to_proto())
2758                .collect(),
2759            removed_statuses: Default::default(),
2760            current_merge_conflicts: self
2761                .merge
2762                .conflicted_paths
2763                .iter()
2764                .map(|repo_path| repo_path.to_proto())
2765                .collect(),
2766            merge_message: self.merge.message.as_ref().map(|msg| msg.to_string()),
2767            project_id,
2768            id: self.id.to_proto(),
2769            abs_path: self.work_directory_abs_path.to_proto(),
2770            entry_ids: vec![self.id.to_proto()],
2771            scan_id: self.scan_id,
2772            is_last_update: true,
2773        }
2774    }
2775
2776    fn build_update(&self, old: &Self, project_id: u64) -> proto::UpdateRepository {
2777        let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
2778        let mut removed_statuses: Vec<String> = Vec::new();
2779
2780        let mut new_statuses = self.statuses_by_path.iter().peekable();
2781        let mut old_statuses = old.statuses_by_path.iter().peekable();
2782
2783        let mut current_new_entry = new_statuses.next();
2784        let mut current_old_entry = old_statuses.next();
2785        loop {
2786            match (current_new_entry, current_old_entry) {
2787                (Some(new_entry), Some(old_entry)) => {
2788                    match new_entry.repo_path.cmp(&old_entry.repo_path) {
2789                        Ordering::Less => {
2790                            updated_statuses.push(new_entry.to_proto());
2791                            current_new_entry = new_statuses.next();
2792                        }
2793                        Ordering::Equal => {
2794                            if new_entry.status != old_entry.status {
2795                                updated_statuses.push(new_entry.to_proto());
2796                            }
2797                            current_old_entry = old_statuses.next();
2798                            current_new_entry = new_statuses.next();
2799                        }
2800                        Ordering::Greater => {
2801                            removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2802                            current_old_entry = old_statuses.next();
2803                        }
2804                    }
2805                }
2806                (None, Some(old_entry)) => {
2807                    removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2808                    current_old_entry = old_statuses.next();
2809                }
2810                (Some(new_entry), None) => {
2811                    updated_statuses.push(new_entry.to_proto());
2812                    current_new_entry = new_statuses.next();
2813                }
2814                (None, None) => break,
2815            }
2816        }
2817
2818        proto::UpdateRepository {
2819            branch_summary: self.branch.as_ref().map(branch_to_proto),
2820            head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2821            updated_statuses,
2822            removed_statuses,
2823            current_merge_conflicts: self
2824                .merge
2825                .conflicted_paths
2826                .iter()
2827                .map(|path| path.as_ref().to_proto())
2828                .collect(),
2829            merge_message: self.merge.message.as_ref().map(|msg| msg.to_string()),
2830            project_id,
2831            id: self.id.to_proto(),
2832            abs_path: self.work_directory_abs_path.to_proto(),
2833            entry_ids: vec![],
2834            scan_id: self.scan_id,
2835            is_last_update: true,
2836        }
2837    }
2838
2839    pub fn status(&self) -> impl Iterator<Item = StatusEntry> + '_ {
2840        self.statuses_by_path.iter().cloned()
2841    }
2842
2843    pub fn status_summary(&self) -> GitSummary {
2844        self.statuses_by_path.summary().item_summary
2845    }
2846
2847    pub fn status_for_path(&self, path: &RepoPath) -> Option<StatusEntry> {
2848        self.statuses_by_path
2849            .get(&PathKey(path.0.clone()), &())
2850            .cloned()
2851    }
2852
2853    pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
2854        Self::abs_path_to_repo_path_inner(&self.work_directory_abs_path, abs_path)
2855    }
2856
2857    #[inline]
2858    fn abs_path_to_repo_path_inner(
2859        work_directory_abs_path: &Path,
2860        abs_path: &Path,
2861    ) -> Option<RepoPath> {
2862        abs_path
2863            .strip_prefix(&work_directory_abs_path)
2864            .map(RepoPath::from)
2865            .ok()
2866    }
2867
2868    pub fn had_conflict_on_last_merge_head_change(&self, repo_path: &RepoPath) -> bool {
2869        self.merge.conflicted_paths.contains(repo_path)
2870    }
2871
2872    pub fn has_conflict(&self, repo_path: &RepoPath) -> bool {
2873        let had_conflict_on_last_merge_head_change =
2874            self.merge.conflicted_paths.contains(repo_path);
2875        let has_conflict_currently = self
2876            .status_for_path(repo_path)
2877            .is_some_and(|entry| entry.status.is_conflicted());
2878        had_conflict_on_last_merge_head_change || has_conflict_currently
2879    }
2880
2881    /// This is the name that will be displayed in the repository selector for this repository.
2882    pub fn display_name(&self) -> SharedString {
2883        self.work_directory_abs_path
2884            .file_name()
2885            .unwrap_or_default()
2886            .to_string_lossy()
2887            .to_string()
2888            .into()
2889    }
2890}
2891
2892impl MergeDetails {
2893    async fn load(
2894        backend: &Arc<dyn GitRepository>,
2895        status: &SumTree<StatusEntry>,
2896        prev_snapshot: &RepositorySnapshot,
2897    ) -> Result<(MergeDetails, bool)> {
2898        log::debug!("load merge details");
2899        let message = backend.merge_message().await;
2900        let heads = backend
2901            .revparse_batch(vec![
2902                "MERGE_HEAD".into(),
2903                "CHERRY_PICK_HEAD".into(),
2904                "REBASE_HEAD".into(),
2905                "REVERT_HEAD".into(),
2906                "APPLY_HEAD".into(),
2907            ])
2908            .await
2909            .log_err()
2910            .unwrap_or_default()
2911            .into_iter()
2912            .map(|opt| opt.map(SharedString::from))
2913            .collect::<Vec<_>>();
2914        let merge_heads_changed = heads != prev_snapshot.merge.heads;
2915        let conflicted_paths = if merge_heads_changed {
2916            let current_conflicted_paths = TreeSet::from_ordered_entries(
2917                status
2918                    .iter()
2919                    .filter(|entry| entry.status.is_conflicted())
2920                    .map(|entry| entry.repo_path.clone()),
2921            );
2922
2923            // It can happen that we run a scan while a lengthy merge is in progress
2924            // that will eventually result in conflicts, but before those conflicts
2925            // are reported by `git status`. Since for the moment we only care about
2926            // the merge heads state for the purposes of tracking conflicts, don't update
2927            // this state until we see some conflicts.
2928            if heads.iter().any(Option::is_some)
2929                && !prev_snapshot.merge.heads.iter().any(Option::is_some)
2930                && current_conflicted_paths.is_empty()
2931            {
2932                log::debug!("not updating merge heads because no conflicts found");
2933                return Ok((
2934                    MergeDetails {
2935                        message: message.map(SharedString::from),
2936                        ..prev_snapshot.merge.clone()
2937                    },
2938                    false,
2939                ));
2940            }
2941
2942            current_conflicted_paths
2943        } else {
2944            prev_snapshot.merge.conflicted_paths.clone()
2945        };
2946        let details = MergeDetails {
2947            conflicted_paths,
2948            message: message.map(SharedString::from),
2949            heads,
2950        };
2951        Ok((details, merge_heads_changed))
2952    }
2953}
2954
2955impl Repository {
2956    pub fn snapshot(&self) -> RepositorySnapshot {
2957        self.snapshot.clone()
2958    }
2959
2960    fn local(
2961        id: RepositoryId,
2962        work_directory_abs_path: Arc<Path>,
2963        dot_git_abs_path: Arc<Path>,
2964        repository_dir_abs_path: Arc<Path>,
2965        common_dir_abs_path: Arc<Path>,
2966        project_environment: WeakEntity<ProjectEnvironment>,
2967        fs: Arc<dyn Fs>,
2968        git_store: WeakEntity<GitStore>,
2969        cx: &mut Context<Self>,
2970    ) -> Self {
2971        let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path.clone());
2972        Repository {
2973            this: cx.weak_entity(),
2974            git_store,
2975            snapshot,
2976            commit_message_buffer: None,
2977            askpass_delegates: Default::default(),
2978            paths_needing_status_update: Default::default(),
2979            latest_askpass_id: 0,
2980            job_sender: Repository::spawn_local_git_worker(
2981                work_directory_abs_path,
2982                dot_git_abs_path,
2983                repository_dir_abs_path,
2984                common_dir_abs_path,
2985                project_environment,
2986                fs,
2987                cx,
2988            ),
2989            job_id: 0,
2990            active_jobs: Default::default(),
2991        }
2992    }
2993
2994    fn remote(
2995        id: RepositoryId,
2996        work_directory_abs_path: Arc<Path>,
2997        project_id: ProjectId,
2998        client: AnyProtoClient,
2999        git_store: WeakEntity<GitStore>,
3000        cx: &mut Context<Self>,
3001    ) -> Self {
3002        let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path);
3003        Self {
3004            this: cx.weak_entity(),
3005            snapshot,
3006            commit_message_buffer: None,
3007            git_store,
3008            paths_needing_status_update: Default::default(),
3009            job_sender: Self::spawn_remote_git_worker(project_id, client, cx),
3010            askpass_delegates: Default::default(),
3011            latest_askpass_id: 0,
3012            active_jobs: Default::default(),
3013            job_id: 0,
3014        }
3015    }
3016
3017    pub fn git_store(&self) -> Option<Entity<GitStore>> {
3018        self.git_store.upgrade()
3019    }
3020
3021    fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
3022        let this = cx.weak_entity();
3023        let git_store = self.git_store.clone();
3024        let _ = self.send_keyed_job(
3025            Some(GitJobKey::ReloadBufferDiffBases),
3026            None,
3027            |state, mut cx| async move {
3028                let RepositoryState::Local { backend, .. } = state else {
3029                    log::error!("tried to recompute diffs for a non-local repository");
3030                    return Ok(());
3031                };
3032
3033                let Some(this) = this.upgrade() else {
3034                    return Ok(());
3035                };
3036
3037                let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
3038                    git_store.update(cx, |git_store, cx| {
3039                        git_store
3040                            .diffs
3041                            .iter()
3042                            .filter_map(|(buffer_id, diff_state)| {
3043                                let buffer_store = git_store.buffer_store.read(cx);
3044                                let buffer = buffer_store.get(*buffer_id)?;
3045                                let file = File::from_dyn(buffer.read(cx).file())?;
3046                                let abs_path =
3047                                    file.worktree.read(cx).absolutize(&file.path).ok()?;
3048                                let repo_path = this.abs_path_to_repo_path(&abs_path)?;
3049                                log::debug!(
3050                                    "start reload diff bases for repo path {}",
3051                                    repo_path.0.display()
3052                                );
3053                                diff_state.update(cx, |diff_state, _| {
3054                                    let has_unstaged_diff = diff_state
3055                                        .unstaged_diff
3056                                        .as_ref()
3057                                        .is_some_and(|diff| diff.is_upgradable());
3058                                    let has_uncommitted_diff = diff_state
3059                                        .uncommitted_diff
3060                                        .as_ref()
3061                                        .is_some_and(|set| set.is_upgradable());
3062
3063                                    Some((
3064                                        buffer,
3065                                        repo_path,
3066                                        has_unstaged_diff.then(|| diff_state.index_text.clone()),
3067                                        has_uncommitted_diff.then(|| diff_state.head_text.clone()),
3068                                    ))
3069                                })
3070                            })
3071                            .collect::<Vec<_>>()
3072                    })
3073                })??;
3074
3075                let buffer_diff_base_changes = cx
3076                    .background_spawn(async move {
3077                        let mut changes = Vec::new();
3078                        for (buffer, repo_path, current_index_text, current_head_text) in
3079                            &repo_diff_state_updates
3080                        {
3081                            let index_text = if current_index_text.is_some() {
3082                                backend.load_index_text(repo_path.clone()).await
3083                            } else {
3084                                None
3085                            };
3086                            let head_text = if current_head_text.is_some() {
3087                                backend.load_committed_text(repo_path.clone()).await
3088                            } else {
3089                                None
3090                            };
3091
3092                            let change =
3093                                match (current_index_text.as_ref(), current_head_text.as_ref()) {
3094                                    (Some(current_index), Some(current_head)) => {
3095                                        let index_changed =
3096                                            index_text.as_ref() != current_index.as_deref();
3097                                        let head_changed =
3098                                            head_text.as_ref() != current_head.as_deref();
3099                                        if index_changed && head_changed {
3100                                            if index_text == head_text {
3101                                                Some(DiffBasesChange::SetBoth(head_text))
3102                                            } else {
3103                                                Some(DiffBasesChange::SetEach {
3104                                                    index: index_text,
3105                                                    head: head_text,
3106                                                })
3107                                            }
3108                                        } else if index_changed {
3109                                            Some(DiffBasesChange::SetIndex(index_text))
3110                                        } else if head_changed {
3111                                            Some(DiffBasesChange::SetHead(head_text))
3112                                        } else {
3113                                            None
3114                                        }
3115                                    }
3116                                    (Some(current_index), None) => {
3117                                        let index_changed =
3118                                            index_text.as_ref() != current_index.as_deref();
3119                                        index_changed
3120                                            .then_some(DiffBasesChange::SetIndex(index_text))
3121                                    }
3122                                    (None, Some(current_head)) => {
3123                                        let head_changed =
3124                                            head_text.as_ref() != current_head.as_deref();
3125                                        head_changed.then_some(DiffBasesChange::SetHead(head_text))
3126                                    }
3127                                    (None, None) => None,
3128                                };
3129
3130                            changes.push((buffer.clone(), change))
3131                        }
3132                        changes
3133                    })
3134                    .await;
3135
3136                git_store.update(&mut cx, |git_store, cx| {
3137                    for (buffer, diff_bases_change) in buffer_diff_base_changes {
3138                        let buffer_snapshot = buffer.read(cx).text_snapshot();
3139                        let buffer_id = buffer_snapshot.remote_id();
3140                        let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
3141                            continue;
3142                        };
3143
3144                        let downstream_client = git_store.downstream_client();
3145                        diff_state.update(cx, |diff_state, cx| {
3146                            use proto::update_diff_bases::Mode;
3147
3148                            if let Some((diff_bases_change, (client, project_id))) =
3149                                diff_bases_change.clone().zip(downstream_client)
3150                            {
3151                                let (staged_text, committed_text, mode) = match diff_bases_change {
3152                                    DiffBasesChange::SetIndex(index) => {
3153                                        (index, None, Mode::IndexOnly)
3154                                    }
3155                                    DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
3156                                    DiffBasesChange::SetEach { index, head } => {
3157                                        (index, head, Mode::IndexAndHead)
3158                                    }
3159                                    DiffBasesChange::SetBoth(text) => {
3160                                        (None, text, Mode::IndexMatchesHead)
3161                                    }
3162                                };
3163                                client
3164                                    .send(proto::UpdateDiffBases {
3165                                        project_id: project_id.to_proto(),
3166                                        buffer_id: buffer_id.to_proto(),
3167                                        staged_text,
3168                                        committed_text,
3169                                        mode: mode as i32,
3170                                    })
3171                                    .log_err();
3172                            }
3173
3174                            diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
3175                        });
3176                    }
3177                })
3178            },
3179        );
3180    }
3181
3182    pub fn send_job<F, Fut, R>(
3183        &mut self,
3184        status: Option<SharedString>,
3185        job: F,
3186    ) -> oneshot::Receiver<R>
3187    where
3188        F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3189        Fut: Future<Output = R> + 'static,
3190        R: Send + 'static,
3191    {
3192        self.send_keyed_job(None, status, job)
3193    }
3194
3195    fn send_keyed_job<F, Fut, R>(
3196        &mut self,
3197        key: Option<GitJobKey>,
3198        status: Option<SharedString>,
3199        job: F,
3200    ) -> oneshot::Receiver<R>
3201    where
3202        F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3203        Fut: Future<Output = R> + 'static,
3204        R: Send + 'static,
3205    {
3206        let (result_tx, result_rx) = futures::channel::oneshot::channel();
3207        let job_id = post_inc(&mut self.job_id);
3208        let this = self.this.clone();
3209        self.job_sender
3210            .unbounded_send(GitJob {
3211                key,
3212                job: Box::new(move |state, cx: &mut AsyncApp| {
3213                    let job = job(state, cx.clone());
3214                    cx.spawn(async move |cx| {
3215                        if let Some(s) = status.clone() {
3216                            this.update(cx, |this, cx| {
3217                                this.active_jobs.insert(
3218                                    job_id,
3219                                    JobInfo {
3220                                        start: Instant::now(),
3221                                        message: s.clone(),
3222                                    },
3223                                );
3224
3225                                cx.notify();
3226                            })
3227                            .ok();
3228                        }
3229                        let result = job.await;
3230
3231                        this.update(cx, |this, cx| {
3232                            this.active_jobs.remove(&job_id);
3233                            cx.notify();
3234                        })
3235                        .ok();
3236
3237                        result_tx.send(result).ok();
3238                    })
3239                }),
3240            })
3241            .ok();
3242        result_rx
3243    }
3244
3245    pub fn set_as_active_repository(&self, cx: &mut Context<Self>) {
3246        let Some(git_store) = self.git_store.upgrade() else {
3247            return;
3248        };
3249        let entity = cx.entity();
3250        git_store.update(cx, |git_store, cx| {
3251            let Some((&id, _)) = git_store
3252                .repositories
3253                .iter()
3254                .find(|(_, handle)| *handle == &entity)
3255            else {
3256                return;
3257            };
3258            git_store.active_repo_id = Some(id);
3259            cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
3260        });
3261    }
3262
3263    pub fn cached_status(&self) -> impl '_ + Iterator<Item = StatusEntry> {
3264        self.snapshot.status()
3265    }
3266
3267    pub fn repo_path_to_project_path(&self, path: &RepoPath, cx: &App) -> Option<ProjectPath> {
3268        let git_store = self.git_store.upgrade()?;
3269        let worktree_store = git_store.read(cx).worktree_store.read(cx);
3270        let abs_path = self.snapshot.work_directory_abs_path.join(&path.0);
3271        let (worktree, relative_path) = worktree_store.find_worktree(abs_path, cx)?;
3272        Some(ProjectPath {
3273            worktree_id: worktree.read(cx).id(),
3274            path: relative_path.into(),
3275        })
3276    }
3277
3278    pub fn project_path_to_repo_path(&self, path: &ProjectPath, cx: &App) -> Option<RepoPath> {
3279        let git_store = self.git_store.upgrade()?;
3280        let worktree_store = git_store.read(cx).worktree_store.read(cx);
3281        let abs_path = worktree_store.absolutize(path, cx)?;
3282        self.snapshot.abs_path_to_repo_path(&abs_path)
3283    }
3284
3285    pub fn contains_sub_repo(&self, other: &Entity<Self>, cx: &App) -> bool {
3286        other
3287            .read(cx)
3288            .snapshot
3289            .work_directory_abs_path
3290            .starts_with(&self.snapshot.work_directory_abs_path)
3291    }
3292
3293    pub fn open_commit_buffer(
3294        &mut self,
3295        languages: Option<Arc<LanguageRegistry>>,
3296        buffer_store: Entity<BufferStore>,
3297        cx: &mut Context<Self>,
3298    ) -> Task<Result<Entity<Buffer>>> {
3299        let id = self.id;
3300        if let Some(buffer) = self.commit_message_buffer.clone() {
3301            return Task::ready(Ok(buffer));
3302        }
3303        let this = cx.weak_entity();
3304
3305        let rx = self.send_job(None, move |state, mut cx| async move {
3306            let Some(this) = this.upgrade() else {
3307                bail!("git store was dropped");
3308            };
3309            match state {
3310                RepositoryState::Local { .. } => {
3311                    this.update(&mut cx, |_, cx| {
3312                        Self::open_local_commit_buffer(languages, buffer_store, cx)
3313                    })?
3314                    .await
3315                }
3316                RepositoryState::Remote { project_id, client } => {
3317                    let request = client.request(proto::OpenCommitMessageBuffer {
3318                        project_id: project_id.0,
3319                        repository_id: id.to_proto(),
3320                    });
3321                    let response = request.await.context("requesting to open commit buffer")?;
3322                    let buffer_id = BufferId::new(response.buffer_id)?;
3323                    let buffer = buffer_store
3324                        .update(&mut cx, |buffer_store, cx| {
3325                            buffer_store.wait_for_remote_buffer(buffer_id, cx)
3326                        })?
3327                        .await?;
3328                    if let Some(language_registry) = languages {
3329                        let git_commit_language =
3330                            language_registry.language_for_name("Git Commit").await?;
3331                        buffer.update(&mut cx, |buffer, cx| {
3332                            buffer.set_language(Some(git_commit_language), cx);
3333                        })?;
3334                    }
3335                    this.update(&mut cx, |this, _| {
3336                        this.commit_message_buffer = Some(buffer.clone());
3337                    })?;
3338                    Ok(buffer)
3339                }
3340            }
3341        });
3342
3343        cx.spawn(|_, _: &mut AsyncApp| async move { rx.await? })
3344    }
3345
3346    fn open_local_commit_buffer(
3347        language_registry: Option<Arc<LanguageRegistry>>,
3348        buffer_store: Entity<BufferStore>,
3349        cx: &mut Context<Self>,
3350    ) -> Task<Result<Entity<Buffer>>> {
3351        cx.spawn(async move |repository, cx| {
3352            let buffer = buffer_store
3353                .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx))?
3354                .await?;
3355
3356            if let Some(language_registry) = language_registry {
3357                let git_commit_language = language_registry.language_for_name("Git Commit").await?;
3358                buffer.update(cx, |buffer, cx| {
3359                    buffer.set_language(Some(git_commit_language), cx);
3360                })?;
3361            }
3362
3363            repository.update(cx, |repository, _| {
3364                repository.commit_message_buffer = Some(buffer.clone());
3365            })?;
3366            Ok(buffer)
3367        })
3368    }
3369
3370    pub fn checkout_files(
3371        &mut self,
3372        commit: &str,
3373        paths: Vec<RepoPath>,
3374        _cx: &mut App,
3375    ) -> oneshot::Receiver<Result<()>> {
3376        let commit = commit.to_string();
3377        let id = self.id;
3378
3379        self.send_job(
3380            Some(format!("git checkout {}", commit).into()),
3381            move |git_repo, _| async move {
3382                match git_repo {
3383                    RepositoryState::Local {
3384                        backend,
3385                        environment,
3386                        ..
3387                    } => {
3388                        backend
3389                            .checkout_files(commit, paths, environment.clone())
3390                            .await
3391                    }
3392                    RepositoryState::Remote { project_id, client } => {
3393                        client
3394                            .request(proto::GitCheckoutFiles {
3395                                project_id: project_id.0,
3396                                repository_id: id.to_proto(),
3397                                commit,
3398                                paths: paths
3399                                    .into_iter()
3400                                    .map(|p| p.to_string_lossy().to_string())
3401                                    .collect(),
3402                            })
3403                            .await?;
3404
3405                        Ok(())
3406                    }
3407                }
3408            },
3409        )
3410    }
3411
3412    pub fn reset(
3413        &mut self,
3414        commit: String,
3415        reset_mode: ResetMode,
3416        _cx: &mut App,
3417    ) -> oneshot::Receiver<Result<()>> {
3418        let id = self.id;
3419
3420        self.send_job(None, move |git_repo, _| async move {
3421            match git_repo {
3422                RepositoryState::Local {
3423                    backend,
3424                    environment,
3425                    ..
3426                } => backend.reset(commit, reset_mode, environment).await,
3427                RepositoryState::Remote { project_id, client } => {
3428                    client
3429                        .request(proto::GitReset {
3430                            project_id: project_id.0,
3431                            repository_id: id.to_proto(),
3432                            commit,
3433                            mode: match reset_mode {
3434                                ResetMode::Soft => git_reset::ResetMode::Soft.into(),
3435                                ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
3436                            },
3437                        })
3438                        .await?;
3439
3440                    Ok(())
3441                }
3442            }
3443        })
3444    }
3445
3446    pub fn show(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDetails>> {
3447        let id = self.id;
3448        self.send_job(None, move |git_repo, _cx| async move {
3449            match git_repo {
3450                RepositoryState::Local { backend, .. } => backend.show(commit).await,
3451                RepositoryState::Remote { project_id, client } => {
3452                    let resp = client
3453                        .request(proto::GitShow {
3454                            project_id: project_id.0,
3455                            repository_id: id.to_proto(),
3456                            commit,
3457                        })
3458                        .await?;
3459
3460                    Ok(CommitDetails {
3461                        sha: resp.sha.into(),
3462                        message: resp.message.into(),
3463                        commit_timestamp: resp.commit_timestamp,
3464                        author_email: resp.author_email.into(),
3465                        author_name: resp.author_name.into(),
3466                    })
3467                }
3468            }
3469        })
3470    }
3471
3472    pub fn load_commit_diff(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
3473        let id = self.id;
3474        self.send_job(None, move |git_repo, cx| async move {
3475            match git_repo {
3476                RepositoryState::Local { backend, .. } => backend.load_commit(commit, cx).await,
3477                RepositoryState::Remote {
3478                    client, project_id, ..
3479                } => {
3480                    let response = client
3481                        .request(proto::LoadCommitDiff {
3482                            project_id: project_id.0,
3483                            repository_id: id.to_proto(),
3484                            commit,
3485                        })
3486                        .await?;
3487                    Ok(CommitDiff {
3488                        files: response
3489                            .files
3490                            .into_iter()
3491                            .map(|file| CommitFile {
3492                                path: Path::new(&file.path).into(),
3493                                old_text: file.old_text,
3494                                new_text: file.new_text,
3495                            })
3496                            .collect(),
3497                    })
3498                }
3499            }
3500        })
3501    }
3502
3503    fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
3504        Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
3505    }
3506
3507    pub fn stage_entries(
3508        &self,
3509        entries: Vec<RepoPath>,
3510        cx: &mut Context<Self>,
3511    ) -> Task<anyhow::Result<()>> {
3512        if entries.is_empty() {
3513            return Task::ready(Ok(()));
3514        }
3515        let id = self.id;
3516
3517        let mut save_futures = Vec::new();
3518        if let Some(buffer_store) = self.buffer_store(cx) {
3519            buffer_store.update(cx, |buffer_store, cx| {
3520                for path in &entries {
3521                    let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3522                        continue;
3523                    };
3524                    if let Some(buffer) = buffer_store.get_by_path(&project_path)
3525                        && buffer
3526                            .read(cx)
3527                            .file()
3528                            .is_some_and(|file| file.disk_state().exists())
3529                    {
3530                        save_futures.push(buffer_store.save_buffer(buffer, cx));
3531                    }
3532                }
3533            })
3534        }
3535
3536        cx.spawn(async move |this, cx| {
3537            for save_future in save_futures {
3538                save_future.await?;
3539            }
3540
3541            this.update(cx, |this, _| {
3542                this.send_job(None, move |git_repo, _cx| async move {
3543                    match git_repo {
3544                        RepositoryState::Local {
3545                            backend,
3546                            environment,
3547                            ..
3548                        } => backend.stage_paths(entries, environment.clone()).await,
3549                        RepositoryState::Remote { project_id, client } => {
3550                            client
3551                                .request(proto::Stage {
3552                                    project_id: project_id.0,
3553                                    repository_id: id.to_proto(),
3554                                    paths: entries
3555                                        .into_iter()
3556                                        .map(|repo_path| repo_path.as_ref().to_proto())
3557                                        .collect(),
3558                                })
3559                                .await
3560                                .context("sending stage request")?;
3561
3562                            Ok(())
3563                        }
3564                    }
3565                })
3566            })?
3567            .await??;
3568
3569            Ok(())
3570        })
3571    }
3572
3573    pub fn unstage_entries(
3574        &self,
3575        entries: Vec<RepoPath>,
3576        cx: &mut Context<Self>,
3577    ) -> Task<anyhow::Result<()>> {
3578        if entries.is_empty() {
3579            return Task::ready(Ok(()));
3580        }
3581        let id = self.id;
3582
3583        let mut save_futures = Vec::new();
3584        if let Some(buffer_store) = self.buffer_store(cx) {
3585            buffer_store.update(cx, |buffer_store, cx| {
3586                for path in &entries {
3587                    let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3588                        continue;
3589                    };
3590                    if let Some(buffer) = buffer_store.get_by_path(&project_path)
3591                        && buffer
3592                            .read(cx)
3593                            .file()
3594                            .is_some_and(|file| file.disk_state().exists())
3595                    {
3596                        save_futures.push(buffer_store.save_buffer(buffer, cx));
3597                    }
3598                }
3599            })
3600        }
3601
3602        cx.spawn(async move |this, cx| {
3603            for save_future in save_futures {
3604                save_future.await?;
3605            }
3606
3607            this.update(cx, |this, _| {
3608                this.send_job(None, move |git_repo, _cx| async move {
3609                    match git_repo {
3610                        RepositoryState::Local {
3611                            backend,
3612                            environment,
3613                            ..
3614                        } => backend.unstage_paths(entries, environment).await,
3615                        RepositoryState::Remote { project_id, client } => {
3616                            client
3617                                .request(proto::Unstage {
3618                                    project_id: project_id.0,
3619                                    repository_id: id.to_proto(),
3620                                    paths: entries
3621                                        .into_iter()
3622                                        .map(|repo_path| repo_path.as_ref().to_proto())
3623                                        .collect(),
3624                                })
3625                                .await
3626                                .context("sending unstage request")?;
3627
3628                            Ok(())
3629                        }
3630                    }
3631                })
3632            })?
3633            .await??;
3634
3635            Ok(())
3636        })
3637    }
3638
3639    pub fn stage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3640        let to_stage = self
3641            .cached_status()
3642            .filter(|entry| !entry.status.staging().is_fully_staged())
3643            .map(|entry| entry.repo_path)
3644            .collect();
3645        self.stage_entries(to_stage, cx)
3646    }
3647
3648    pub fn unstage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3649        let to_unstage = self
3650            .cached_status()
3651            .filter(|entry| entry.status.staging().has_staged())
3652            .map(|entry| entry.repo_path)
3653            .collect();
3654        self.unstage_entries(to_unstage, cx)
3655    }
3656
3657    pub fn stash_all(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3658        let to_stash = self.cached_status().map(|entry| entry.repo_path).collect();
3659
3660        self.stash_entries(to_stash, cx)
3661    }
3662
3663    pub fn stash_entries(
3664        &mut self,
3665        entries: Vec<RepoPath>,
3666        cx: &mut Context<Self>,
3667    ) -> Task<anyhow::Result<()>> {
3668        let id = self.id;
3669
3670        cx.spawn(async move |this, cx| {
3671            this.update(cx, |this, _| {
3672                this.send_job(None, move |git_repo, _cx| async move {
3673                    match git_repo {
3674                        RepositoryState::Local {
3675                            backend,
3676                            environment,
3677                            ..
3678                        } => backend.stash_paths(entries, environment).await,
3679                        RepositoryState::Remote { project_id, client } => {
3680                            client
3681                                .request(proto::Stash {
3682                                    project_id: project_id.0,
3683                                    repository_id: id.to_proto(),
3684                                    paths: entries
3685                                        .into_iter()
3686                                        .map(|repo_path| repo_path.as_ref().to_proto())
3687                                        .collect(),
3688                                })
3689                                .await
3690                                .context("sending stash request")?;
3691                            Ok(())
3692                        }
3693                    }
3694                })
3695            })?
3696            .await??;
3697            Ok(())
3698        })
3699    }
3700
3701    pub fn stash_pop(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3702        let id = self.id;
3703        cx.spawn(async move |this, cx| {
3704            this.update(cx, |this, _| {
3705                this.send_job(None, move |git_repo, _cx| async move {
3706                    match git_repo {
3707                        RepositoryState::Local {
3708                            backend,
3709                            environment,
3710                            ..
3711                        } => backend.stash_pop(environment).await,
3712                        RepositoryState::Remote { project_id, client } => {
3713                            client
3714                                .request(proto::StashPop {
3715                                    project_id: project_id.0,
3716                                    repository_id: id.to_proto(),
3717                                })
3718                                .await
3719                                .context("sending stash pop request")?;
3720                            Ok(())
3721                        }
3722                    }
3723                })
3724            })?
3725            .await??;
3726            Ok(())
3727        })
3728    }
3729
3730    pub fn commit(
3731        &mut self,
3732        message: SharedString,
3733        name_and_email: Option<(SharedString, SharedString)>,
3734        options: CommitOptions,
3735        _cx: &mut App,
3736    ) -> oneshot::Receiver<Result<()>> {
3737        let id = self.id;
3738
3739        self.send_job(Some("git commit".into()), move |git_repo, _cx| async move {
3740            match git_repo {
3741                RepositoryState::Local {
3742                    backend,
3743                    environment,
3744                    ..
3745                } => {
3746                    backend
3747                        .commit(message, name_and_email, options, environment)
3748                        .await
3749                }
3750                RepositoryState::Remote { project_id, client } => {
3751                    let (name, email) = name_and_email.unzip();
3752                    client
3753                        .request(proto::Commit {
3754                            project_id: project_id.0,
3755                            repository_id: id.to_proto(),
3756                            message: String::from(message),
3757                            name: name.map(String::from),
3758                            email: email.map(String::from),
3759                            options: Some(proto::commit::CommitOptions {
3760                                amend: options.amend,
3761                                signoff: options.signoff,
3762                            }),
3763                        })
3764                        .await
3765                        .context("sending commit request")?;
3766
3767                    Ok(())
3768                }
3769            }
3770        })
3771    }
3772
3773    pub fn fetch(
3774        &mut self,
3775        fetch_options: FetchOptions,
3776        askpass: AskPassDelegate,
3777        _cx: &mut App,
3778    ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3779        let askpass_delegates = self.askpass_delegates.clone();
3780        let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3781        let id = self.id;
3782
3783        self.send_job(Some("git fetch".into()), move |git_repo, cx| async move {
3784            match git_repo {
3785                RepositoryState::Local {
3786                    backend,
3787                    environment,
3788                    ..
3789                } => backend.fetch(fetch_options, askpass, environment, cx).await,
3790                RepositoryState::Remote { project_id, client } => {
3791                    askpass_delegates.lock().insert(askpass_id, askpass);
3792                    let _defer = util::defer(|| {
3793                        let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3794                        debug_assert!(askpass_delegate.is_some());
3795                    });
3796
3797                    let response = client
3798                        .request(proto::Fetch {
3799                            project_id: project_id.0,
3800                            repository_id: id.to_proto(),
3801                            askpass_id,
3802                            remote: fetch_options.to_proto(),
3803                        })
3804                        .await
3805                        .context("sending fetch request")?;
3806
3807                    Ok(RemoteCommandOutput {
3808                        stdout: response.stdout,
3809                        stderr: response.stderr,
3810                    })
3811                }
3812            }
3813        })
3814    }
3815
3816    pub fn push(
3817        &mut self,
3818        branch: SharedString,
3819        remote: SharedString,
3820        options: Option<PushOptions>,
3821        askpass: AskPassDelegate,
3822        cx: &mut Context<Self>,
3823    ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3824        let askpass_delegates = self.askpass_delegates.clone();
3825        let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3826        let id = self.id;
3827
3828        let args = options
3829            .map(|option| match option {
3830                PushOptions::SetUpstream => " --set-upstream",
3831                PushOptions::Force => " --force-with-lease",
3832            })
3833            .unwrap_or("");
3834
3835        let updates_tx = self
3836            .git_store()
3837            .and_then(|git_store| match &git_store.read(cx).state {
3838                GitStoreState::Local { downstream, .. } => downstream
3839                    .as_ref()
3840                    .map(|downstream| downstream.updates_tx.clone()),
3841                _ => None,
3842            });
3843
3844        let this = cx.weak_entity();
3845        self.send_job(
3846            Some(format!("git push {} {} {}", args, branch, remote).into()),
3847            move |git_repo, mut cx| async move {
3848                match git_repo {
3849                    RepositoryState::Local {
3850                        backend,
3851                        environment,
3852                        ..
3853                    } => {
3854                        let result = backend
3855                            .push(
3856                                branch.to_string(),
3857                                remote.to_string(),
3858                                options,
3859                                askpass,
3860                                environment.clone(),
3861                                cx.clone(),
3862                            )
3863                            .await;
3864                        if result.is_ok() {
3865                            let branches = backend.branches().await?;
3866                            let branch = branches.into_iter().find(|branch| branch.is_head);
3867                            log::info!("head branch after scan is {branch:?}");
3868                            let snapshot = this.update(&mut cx, |this, cx| {
3869                                this.snapshot.branch = branch;
3870                                let snapshot = this.snapshot.clone();
3871                                cx.emit(RepositoryEvent::Updated {
3872                                    full_scan: false,
3873                                    new_instance: false,
3874                                });
3875                                snapshot
3876                            })?;
3877                            if let Some(updates_tx) = updates_tx {
3878                                updates_tx
3879                                    .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
3880                                    .ok();
3881                            }
3882                        }
3883                        result
3884                    }
3885                    RepositoryState::Remote { project_id, client } => {
3886                        askpass_delegates.lock().insert(askpass_id, askpass);
3887                        let _defer = util::defer(|| {
3888                            let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3889                            debug_assert!(askpass_delegate.is_some());
3890                        });
3891                        let response = client
3892                            .request(proto::Push {
3893                                project_id: project_id.0,
3894                                repository_id: id.to_proto(),
3895                                askpass_id,
3896                                branch_name: branch.to_string(),
3897                                remote_name: remote.to_string(),
3898                                options: options.map(|options| match options {
3899                                    PushOptions::Force => proto::push::PushOptions::Force,
3900                                    PushOptions::SetUpstream => {
3901                                        proto::push::PushOptions::SetUpstream
3902                                    }
3903                                }
3904                                    as i32),
3905                            })
3906                            .await
3907                            .context("sending push request")?;
3908
3909                        Ok(RemoteCommandOutput {
3910                            stdout: response.stdout,
3911                            stderr: response.stderr,
3912                        })
3913                    }
3914                }
3915            },
3916        )
3917    }
3918
3919    pub fn pull(
3920        &mut self,
3921        branch: SharedString,
3922        remote: SharedString,
3923        askpass: AskPassDelegate,
3924        _cx: &mut App,
3925    ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3926        let askpass_delegates = self.askpass_delegates.clone();
3927        let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3928        let id = self.id;
3929
3930        self.send_job(
3931            Some(format!("git pull {} {}", remote, branch).into()),
3932            move |git_repo, cx| async move {
3933                match git_repo {
3934                    RepositoryState::Local {
3935                        backend,
3936                        environment,
3937                        ..
3938                    } => {
3939                        backend
3940                            .pull(
3941                                branch.to_string(),
3942                                remote.to_string(),
3943                                askpass,
3944                                environment.clone(),
3945                                cx,
3946                            )
3947                            .await
3948                    }
3949                    RepositoryState::Remote { project_id, client } => {
3950                        askpass_delegates.lock().insert(askpass_id, askpass);
3951                        let _defer = util::defer(|| {
3952                            let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3953                            debug_assert!(askpass_delegate.is_some());
3954                        });
3955                        let response = client
3956                            .request(proto::Pull {
3957                                project_id: project_id.0,
3958                                repository_id: id.to_proto(),
3959                                askpass_id,
3960                                branch_name: branch.to_string(),
3961                                remote_name: remote.to_string(),
3962                            })
3963                            .await
3964                            .context("sending pull request")?;
3965
3966                        Ok(RemoteCommandOutput {
3967                            stdout: response.stdout,
3968                            stderr: response.stderr,
3969                        })
3970                    }
3971                }
3972            },
3973        )
3974    }
3975
3976    fn spawn_set_index_text_job(
3977        &mut self,
3978        path: RepoPath,
3979        content: Option<String>,
3980        hunk_staging_operation_count: Option<usize>,
3981        cx: &mut Context<Self>,
3982    ) -> oneshot::Receiver<anyhow::Result<()>> {
3983        let id = self.id;
3984        let this = cx.weak_entity();
3985        let git_store = self.git_store.clone();
3986        self.send_keyed_job(
3987            Some(GitJobKey::WriteIndex(path.clone())),
3988            None,
3989            move |git_repo, mut cx| async move {
3990                log::debug!("start updating index text for buffer {}", path.display());
3991                match git_repo {
3992                    RepositoryState::Local {
3993                        backend,
3994                        environment,
3995                        ..
3996                    } => {
3997                        backend
3998                            .set_index_text(path.clone(), content, environment.clone())
3999                            .await?;
4000                    }
4001                    RepositoryState::Remote { project_id, client } => {
4002                        client
4003                            .request(proto::SetIndexText {
4004                                project_id: project_id.0,
4005                                repository_id: id.to_proto(),
4006                                path: path.as_ref().to_proto(),
4007                                text: content,
4008                            })
4009                            .await?;
4010                    }
4011                }
4012                log::debug!("finish updating index text for buffer {}", path.display());
4013
4014                if let Some(hunk_staging_operation_count) = hunk_staging_operation_count {
4015                    let project_path = this
4016                        .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
4017                        .ok()
4018                        .flatten();
4019                    git_store.update(&mut cx, |git_store, cx| {
4020                        let buffer_id = git_store
4021                            .buffer_store
4022                            .read(cx)
4023                            .get_by_path(&project_path?)?
4024                            .read(cx)
4025                            .remote_id();
4026                        let diff_state = git_store.diffs.get(&buffer_id)?;
4027                        diff_state.update(cx, |diff_state, _| {
4028                            diff_state.hunk_staging_operation_count_as_of_write =
4029                                hunk_staging_operation_count;
4030                        });
4031                        Some(())
4032                    })?;
4033                }
4034                Ok(())
4035            },
4036        )
4037    }
4038
4039    pub fn get_remotes(
4040        &mut self,
4041        branch_name: Option<String>,
4042    ) -> oneshot::Receiver<Result<Vec<Remote>>> {
4043        let id = self.id;
4044        self.send_job(None, move |repo, _cx| async move {
4045            match repo {
4046                RepositoryState::Local { backend, .. } => backend.get_remotes(branch_name).await,
4047                RepositoryState::Remote { project_id, client } => {
4048                    let response = client
4049                        .request(proto::GetRemotes {
4050                            project_id: project_id.0,
4051                            repository_id: id.to_proto(),
4052                            branch_name,
4053                        })
4054                        .await?;
4055
4056                    let remotes = response
4057                        .remotes
4058                        .into_iter()
4059                        .map(|remotes| git::repository::Remote {
4060                            name: remotes.name.into(),
4061                        })
4062                        .collect();
4063
4064                    Ok(remotes)
4065                }
4066            }
4067        })
4068    }
4069
4070    pub fn branches(&mut self) -> oneshot::Receiver<Result<Vec<Branch>>> {
4071        let id = self.id;
4072        self.send_job(None, move |repo, _| async move {
4073            match repo {
4074                RepositoryState::Local { backend, .. } => backend.branches().await,
4075                RepositoryState::Remote { project_id, client } => {
4076                    let response = client
4077                        .request(proto::GitGetBranches {
4078                            project_id: project_id.0,
4079                            repository_id: id.to_proto(),
4080                        })
4081                        .await?;
4082
4083                    let branches = response
4084                        .branches
4085                        .into_iter()
4086                        .map(|branch| proto_to_branch(&branch))
4087                        .collect();
4088
4089                    Ok(branches)
4090                }
4091            }
4092        })
4093    }
4094
4095    pub fn default_branch(&mut self) -> oneshot::Receiver<Result<Option<SharedString>>> {
4096        let id = self.id;
4097        self.send_job(None, move |repo, _| async move {
4098            match repo {
4099                RepositoryState::Local { backend, .. } => backend.default_branch().await,
4100                RepositoryState::Remote { project_id, client } => {
4101                    let response = client
4102                        .request(proto::GetDefaultBranch {
4103                            project_id: project_id.0,
4104                            repository_id: id.to_proto(),
4105                        })
4106                        .await?;
4107
4108                    anyhow::Ok(response.branch.map(SharedString::from))
4109                }
4110            }
4111        })
4112    }
4113
4114    pub fn diff(&mut self, diff_type: DiffType, _cx: &App) -> oneshot::Receiver<Result<String>> {
4115        let id = self.id;
4116        self.send_job(None, move |repo, _cx| async move {
4117            match repo {
4118                RepositoryState::Local { backend, .. } => backend.diff(diff_type).await,
4119                RepositoryState::Remote { project_id, client } => {
4120                    let response = client
4121                        .request(proto::GitDiff {
4122                            project_id: project_id.0,
4123                            repository_id: id.to_proto(),
4124                            diff_type: match diff_type {
4125                                DiffType::HeadToIndex => {
4126                                    proto::git_diff::DiffType::HeadToIndex.into()
4127                                }
4128                                DiffType::HeadToWorktree => {
4129                                    proto::git_diff::DiffType::HeadToWorktree.into()
4130                                }
4131                            },
4132                        })
4133                        .await?;
4134
4135                    Ok(response.diff)
4136                }
4137            }
4138        })
4139    }
4140
4141    pub fn create_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
4142        let id = self.id;
4143        self.send_job(
4144            Some(format!("git switch -c {branch_name}").into()),
4145            move |repo, _cx| async move {
4146                match repo {
4147                    RepositoryState::Local { backend, .. } => {
4148                        backend.create_branch(branch_name).await
4149                    }
4150                    RepositoryState::Remote { project_id, client } => {
4151                        client
4152                            .request(proto::GitCreateBranch {
4153                                project_id: project_id.0,
4154                                repository_id: id.to_proto(),
4155                                branch_name,
4156                            })
4157                            .await?;
4158
4159                        Ok(())
4160                    }
4161                }
4162            },
4163        )
4164    }
4165
4166    pub fn change_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
4167        let id = self.id;
4168        self.send_job(
4169            Some(format!("git switch {branch_name}").into()),
4170            move |repo, _cx| async move {
4171                match repo {
4172                    RepositoryState::Local { backend, .. } => {
4173                        backend.change_branch(branch_name).await
4174                    }
4175                    RepositoryState::Remote { project_id, client } => {
4176                        client
4177                            .request(proto::GitChangeBranch {
4178                                project_id: project_id.0,
4179                                repository_id: id.to_proto(),
4180                                branch_name,
4181                            })
4182                            .await?;
4183
4184                        Ok(())
4185                    }
4186                }
4187            },
4188        )
4189    }
4190
4191    pub fn check_for_pushed_commits(&mut self) -> oneshot::Receiver<Result<Vec<SharedString>>> {
4192        let id = self.id;
4193        self.send_job(None, move |repo, _cx| async move {
4194            match repo {
4195                RepositoryState::Local { backend, .. } => backend.check_for_pushed_commit().await,
4196                RepositoryState::Remote { project_id, client } => {
4197                    let response = client
4198                        .request(proto::CheckForPushedCommits {
4199                            project_id: project_id.0,
4200                            repository_id: id.to_proto(),
4201                        })
4202                        .await?;
4203
4204                    let branches = response.pushed_to.into_iter().map(Into::into).collect();
4205
4206                    Ok(branches)
4207                }
4208            }
4209        })
4210    }
4211
4212    pub fn checkpoint(&mut self) -> oneshot::Receiver<Result<GitRepositoryCheckpoint>> {
4213        self.send_job(None, |repo, _cx| async move {
4214            match repo {
4215                RepositoryState::Local { backend, .. } => backend.checkpoint().await,
4216                RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
4217            }
4218        })
4219    }
4220
4221    pub fn restore_checkpoint(
4222        &mut self,
4223        checkpoint: GitRepositoryCheckpoint,
4224    ) -> oneshot::Receiver<Result<()>> {
4225        self.send_job(None, move |repo, _cx| async move {
4226            match repo {
4227                RepositoryState::Local { backend, .. } => {
4228                    backend.restore_checkpoint(checkpoint).await
4229                }
4230                RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
4231            }
4232        })
4233    }
4234
4235    pub(crate) fn apply_remote_update(
4236        &mut self,
4237        update: proto::UpdateRepository,
4238        is_new: bool,
4239        cx: &mut Context<Self>,
4240    ) -> Result<()> {
4241        let conflicted_paths = TreeSet::from_ordered_entries(
4242            update
4243                .current_merge_conflicts
4244                .into_iter()
4245                .map(|path| RepoPath(Path::new(&path).into())),
4246        );
4247        self.snapshot.branch = update.branch_summary.as_ref().map(proto_to_branch);
4248        self.snapshot.head_commit = update
4249            .head_commit_details
4250            .as_ref()
4251            .map(proto_to_commit_details);
4252
4253        self.snapshot.merge.conflicted_paths = conflicted_paths;
4254        self.snapshot.merge.message = update.merge_message.map(SharedString::from);
4255
4256        let edits = update
4257            .removed_statuses
4258            .into_iter()
4259            .map(|path| sum_tree::Edit::Remove(PathKey(FromProto::from_proto(path))))
4260            .chain(
4261                update
4262                    .updated_statuses
4263                    .into_iter()
4264                    .filter_map(|updated_status| {
4265                        Some(sum_tree::Edit::Insert(updated_status.try_into().log_err()?))
4266                    }),
4267            )
4268            .collect::<Vec<_>>();
4269        self.snapshot.statuses_by_path.edit(edits, &());
4270        if update.is_last_update {
4271            self.snapshot.scan_id = update.scan_id;
4272        }
4273        cx.emit(RepositoryEvent::Updated {
4274            full_scan: true,
4275            new_instance: is_new,
4276        });
4277        Ok(())
4278    }
4279
4280    pub fn compare_checkpoints(
4281        &mut self,
4282        left: GitRepositoryCheckpoint,
4283        right: GitRepositoryCheckpoint,
4284    ) -> oneshot::Receiver<Result<bool>> {
4285        self.send_job(None, move |repo, _cx| async move {
4286            match repo {
4287                RepositoryState::Local { backend, .. } => {
4288                    backend.compare_checkpoints(left, right).await
4289                }
4290                RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
4291            }
4292        })
4293    }
4294
4295    pub fn diff_checkpoints(
4296        &mut self,
4297        base_checkpoint: GitRepositoryCheckpoint,
4298        target_checkpoint: GitRepositoryCheckpoint,
4299    ) -> oneshot::Receiver<Result<String>> {
4300        self.send_job(None, move |repo, _cx| async move {
4301            match repo {
4302                RepositoryState::Local { backend, .. } => {
4303                    backend
4304                        .diff_checkpoints(base_checkpoint, target_checkpoint)
4305                        .await
4306                }
4307                RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
4308            }
4309        })
4310    }
4311
4312    fn schedule_scan(
4313        &mut self,
4314        updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4315        cx: &mut Context<Self>,
4316    ) {
4317        let this = cx.weak_entity();
4318        let _ = self.send_keyed_job(
4319            Some(GitJobKey::ReloadGitState),
4320            None,
4321            |state, mut cx| async move {
4322                log::debug!("run scheduled git status scan");
4323
4324                let Some(this) = this.upgrade() else {
4325                    return Ok(());
4326                };
4327                let RepositoryState::Local { backend, .. } = state else {
4328                    bail!("not a local repository")
4329                };
4330                let (snapshot, events) = this
4331                    .update(&mut cx, |this, _| {
4332                        this.paths_needing_status_update.clear();
4333                        compute_snapshot(
4334                            this.id,
4335                            this.work_directory_abs_path.clone(),
4336                            this.snapshot.clone(),
4337                            backend.clone(),
4338                        )
4339                    })?
4340                    .await?;
4341                this.update(&mut cx, |this, cx| {
4342                    this.snapshot = snapshot.clone();
4343                    for event in events {
4344                        cx.emit(event);
4345                    }
4346                })?;
4347                if let Some(updates_tx) = updates_tx {
4348                    updates_tx
4349                        .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
4350                        .ok();
4351                }
4352                Ok(())
4353            },
4354        );
4355    }
4356
4357    fn spawn_local_git_worker(
4358        work_directory_abs_path: Arc<Path>,
4359        dot_git_abs_path: Arc<Path>,
4360        _repository_dir_abs_path: Arc<Path>,
4361        _common_dir_abs_path: Arc<Path>,
4362        project_environment: WeakEntity<ProjectEnvironment>,
4363        fs: Arc<dyn Fs>,
4364        cx: &mut Context<Self>,
4365    ) -> mpsc::UnboundedSender<GitJob> {
4366        let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4367
4368        cx.spawn(async move |_, cx| {
4369            let environment = project_environment
4370                .upgrade()
4371                .context("missing project environment")?
4372                .update(cx, |project_environment, cx| {
4373                    project_environment.get_directory_environment(work_directory_abs_path.clone(), cx)
4374                })?
4375                .await
4376                .unwrap_or_else(|| {
4377                    log::error!("failed to get working directory environment for repository {work_directory_abs_path:?}");
4378                    HashMap::default()
4379                });
4380            let backend = cx
4381                .background_spawn(async move {
4382                    fs.open_repo(&dot_git_abs_path)
4383                        .with_context(|| format!("opening repository at {dot_git_abs_path:?}"))
4384                })
4385                .await?;
4386
4387            if let Some(git_hosting_provider_registry) =
4388                cx.update(|cx| GitHostingProviderRegistry::try_global(cx))?
4389            {
4390                git_hosting_providers::register_additional_providers(
4391                    git_hosting_provider_registry,
4392                    backend.clone(),
4393                );
4394            }
4395
4396            let state = RepositoryState::Local {
4397                backend,
4398                environment: Arc::new(environment),
4399            };
4400            let mut jobs = VecDeque::new();
4401            loop {
4402                while let Ok(Some(next_job)) = job_rx.try_next() {
4403                    jobs.push_back(next_job);
4404                }
4405
4406                if let Some(job) = jobs.pop_front() {
4407                    if let Some(current_key) = &job.key
4408                        && jobs
4409                            .iter()
4410                            .any(|other_job| other_job.key.as_ref() == Some(current_key))
4411                        {
4412                            continue;
4413                        }
4414                    (job.job)(state.clone(), cx).await;
4415                } else if let Some(job) = job_rx.next().await {
4416                    jobs.push_back(job);
4417                } else {
4418                    break;
4419                }
4420            }
4421            anyhow::Ok(())
4422        })
4423        .detach_and_log_err(cx);
4424
4425        job_tx
4426    }
4427
4428    fn spawn_remote_git_worker(
4429        project_id: ProjectId,
4430        client: AnyProtoClient,
4431        cx: &mut Context<Self>,
4432    ) -> mpsc::UnboundedSender<GitJob> {
4433        let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4434
4435        cx.spawn(async move |_, cx| {
4436            let state = RepositoryState::Remote { project_id, client };
4437            let mut jobs = VecDeque::new();
4438            loop {
4439                while let Ok(Some(next_job)) = job_rx.try_next() {
4440                    jobs.push_back(next_job);
4441                }
4442
4443                if let Some(job) = jobs.pop_front() {
4444                    if let Some(current_key) = &job.key
4445                        && jobs
4446                            .iter()
4447                            .any(|other_job| other_job.key.as_ref() == Some(current_key))
4448                    {
4449                        continue;
4450                    }
4451                    (job.job)(state.clone(), cx).await;
4452                } else if let Some(job) = job_rx.next().await {
4453                    jobs.push_back(job);
4454                } else {
4455                    break;
4456                }
4457            }
4458            anyhow::Ok(())
4459        })
4460        .detach_and_log_err(cx);
4461
4462        job_tx
4463    }
4464
4465    fn load_staged_text(
4466        &mut self,
4467        buffer_id: BufferId,
4468        repo_path: RepoPath,
4469        cx: &App,
4470    ) -> Task<Result<Option<String>>> {
4471        let rx = self.send_job(None, move |state, _| async move {
4472            match state {
4473                RepositoryState::Local { backend, .. } => {
4474                    anyhow::Ok(backend.load_index_text(repo_path).await)
4475                }
4476                RepositoryState::Remote { project_id, client } => {
4477                    let response = client
4478                        .request(proto::OpenUnstagedDiff {
4479                            project_id: project_id.to_proto(),
4480                            buffer_id: buffer_id.to_proto(),
4481                        })
4482                        .await?;
4483                    Ok(response.staged_text)
4484                }
4485            }
4486        });
4487        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4488    }
4489
4490    fn load_committed_text(
4491        &mut self,
4492        buffer_id: BufferId,
4493        repo_path: RepoPath,
4494        cx: &App,
4495    ) -> Task<Result<DiffBasesChange>> {
4496        let rx = self.send_job(None, move |state, _| async move {
4497            match state {
4498                RepositoryState::Local { backend, .. } => {
4499                    let committed_text = backend.load_committed_text(repo_path.clone()).await;
4500                    let staged_text = backend.load_index_text(repo_path).await;
4501                    let diff_bases_change = if committed_text == staged_text {
4502                        DiffBasesChange::SetBoth(committed_text)
4503                    } else {
4504                        DiffBasesChange::SetEach {
4505                            index: staged_text,
4506                            head: committed_text,
4507                        }
4508                    };
4509                    anyhow::Ok(diff_bases_change)
4510                }
4511                RepositoryState::Remote { project_id, client } => {
4512                    use proto::open_uncommitted_diff_response::Mode;
4513
4514                    let response = client
4515                        .request(proto::OpenUncommittedDiff {
4516                            project_id: project_id.to_proto(),
4517                            buffer_id: buffer_id.to_proto(),
4518                        })
4519                        .await?;
4520                    let mode = Mode::from_i32(response.mode).context("Invalid mode")?;
4521                    let bases = match mode {
4522                        Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
4523                        Mode::IndexAndHead => DiffBasesChange::SetEach {
4524                            head: response.committed_text,
4525                            index: response.staged_text,
4526                        },
4527                    };
4528                    Ok(bases)
4529                }
4530            }
4531        });
4532
4533        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4534    }
4535
4536    fn paths_changed(
4537        &mut self,
4538        paths: Vec<RepoPath>,
4539        updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4540        cx: &mut Context<Self>,
4541    ) {
4542        self.paths_needing_status_update.extend(paths);
4543
4544        let this = cx.weak_entity();
4545        let _ = self.send_keyed_job(
4546            Some(GitJobKey::RefreshStatuses),
4547            None,
4548            |state, mut cx| async move {
4549                let (prev_snapshot, mut changed_paths) = this.update(&mut cx, |this, _| {
4550                    (
4551                        this.snapshot.clone(),
4552                        mem::take(&mut this.paths_needing_status_update),
4553                    )
4554                })?;
4555                let RepositoryState::Local { backend, .. } = state else {
4556                    bail!("not a local repository")
4557                };
4558
4559                let paths = changed_paths.iter().cloned().collect::<Vec<_>>();
4560                if paths.is_empty() {
4561                    return Ok(());
4562                }
4563                let statuses = backend.status(&paths).await?;
4564
4565                let changed_path_statuses = cx
4566                    .background_spawn(async move {
4567                        let mut changed_path_statuses = Vec::new();
4568                        let prev_statuses = prev_snapshot.statuses_by_path.clone();
4569                        let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4570
4571                        for (repo_path, status) in &*statuses.entries {
4572                            changed_paths.remove(repo_path);
4573                            if cursor.seek_forward(&PathTarget::Path(repo_path), Bias::Left)
4574                                && cursor.item().is_some_and(|entry| entry.status == *status)
4575                            {
4576                                continue;
4577                            }
4578
4579                            changed_path_statuses.push(Edit::Insert(StatusEntry {
4580                                repo_path: repo_path.clone(),
4581                                status: *status,
4582                            }));
4583                        }
4584                        let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4585                        for path in changed_paths.into_iter() {
4586                            if cursor.seek_forward(&PathTarget::Path(&path), Bias::Left) {
4587                                changed_path_statuses.push(Edit::Remove(PathKey(path.0)));
4588                            }
4589                        }
4590                        changed_path_statuses
4591                    })
4592                    .await;
4593
4594                this.update(&mut cx, |this, cx| {
4595                    if !changed_path_statuses.is_empty() {
4596                        this.snapshot
4597                            .statuses_by_path
4598                            .edit(changed_path_statuses, &());
4599                        this.snapshot.scan_id += 1;
4600                        if let Some(updates_tx) = updates_tx {
4601                            updates_tx
4602                                .unbounded_send(DownstreamUpdate::UpdateRepository(
4603                                    this.snapshot.clone(),
4604                                ))
4605                                .ok();
4606                        }
4607                    }
4608                    cx.emit(RepositoryEvent::Updated {
4609                        full_scan: false,
4610                        new_instance: false,
4611                    });
4612                })
4613            },
4614        );
4615    }
4616
4617    /// currently running git command and when it started
4618    pub fn current_job(&self) -> Option<JobInfo> {
4619        self.active_jobs.values().next().cloned()
4620    }
4621
4622    pub fn barrier(&mut self) -> oneshot::Receiver<()> {
4623        self.send_job(None, |_, _| async {})
4624    }
4625}
4626
4627fn get_permalink_in_rust_registry_src(
4628    provider_registry: Arc<GitHostingProviderRegistry>,
4629    path: PathBuf,
4630    selection: Range<u32>,
4631) -> Result<url::Url> {
4632    #[derive(Deserialize)]
4633    struct CargoVcsGit {
4634        sha1: String,
4635    }
4636
4637    #[derive(Deserialize)]
4638    struct CargoVcsInfo {
4639        git: CargoVcsGit,
4640        path_in_vcs: String,
4641    }
4642
4643    #[derive(Deserialize)]
4644    struct CargoPackage {
4645        repository: String,
4646    }
4647
4648    #[derive(Deserialize)]
4649    struct CargoToml {
4650        package: CargoPackage,
4651    }
4652
4653    let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
4654        let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
4655        Some((dir, json))
4656    }) else {
4657        bail!("No .cargo_vcs_info.json found in parent directories")
4658    };
4659    let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
4660    let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
4661    let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
4662    let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
4663        .context("parsing package.repository field of manifest")?;
4664    let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
4665    let permalink = provider.build_permalink(
4666        remote,
4667        BuildPermalinkParams {
4668            sha: &cargo_vcs_info.git.sha1,
4669            path: &path.to_string_lossy(),
4670            selection: Some(selection),
4671        },
4672    );
4673    Ok(permalink)
4674}
4675
4676fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
4677    let Some(blame) = blame else {
4678        return proto::BlameBufferResponse {
4679            blame_response: None,
4680        };
4681    };
4682
4683    let entries = blame
4684        .entries
4685        .into_iter()
4686        .map(|entry| proto::BlameEntry {
4687            sha: entry.sha.as_bytes().into(),
4688            start_line: entry.range.start,
4689            end_line: entry.range.end,
4690            original_line_number: entry.original_line_number,
4691            author: entry.author,
4692            author_mail: entry.author_mail,
4693            author_time: entry.author_time,
4694            author_tz: entry.author_tz,
4695            committer: entry.committer_name,
4696            committer_mail: entry.committer_email,
4697            committer_time: entry.committer_time,
4698            committer_tz: entry.committer_tz,
4699            summary: entry.summary,
4700            previous: entry.previous,
4701            filename: entry.filename,
4702        })
4703        .collect::<Vec<_>>();
4704
4705    let messages = blame
4706        .messages
4707        .into_iter()
4708        .map(|(oid, message)| proto::CommitMessage {
4709            oid: oid.as_bytes().into(),
4710            message,
4711        })
4712        .collect::<Vec<_>>();
4713
4714    proto::BlameBufferResponse {
4715        blame_response: Some(proto::blame_buffer_response::BlameResponse {
4716            entries,
4717            messages,
4718            remote_url: blame.remote_url,
4719        }),
4720    }
4721}
4722
4723fn deserialize_blame_buffer_response(
4724    response: proto::BlameBufferResponse,
4725) -> Option<git::blame::Blame> {
4726    let response = response.blame_response?;
4727    let entries = response
4728        .entries
4729        .into_iter()
4730        .filter_map(|entry| {
4731            Some(git::blame::BlameEntry {
4732                sha: git::Oid::from_bytes(&entry.sha).ok()?,
4733                range: entry.start_line..entry.end_line,
4734                original_line_number: entry.original_line_number,
4735                committer_name: entry.committer,
4736                committer_time: entry.committer_time,
4737                committer_tz: entry.committer_tz,
4738                committer_email: entry.committer_mail,
4739                author: entry.author,
4740                author_mail: entry.author_mail,
4741                author_time: entry.author_time,
4742                author_tz: entry.author_tz,
4743                summary: entry.summary,
4744                previous: entry.previous,
4745                filename: entry.filename,
4746            })
4747        })
4748        .collect::<Vec<_>>();
4749
4750    let messages = response
4751        .messages
4752        .into_iter()
4753        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
4754        .collect::<HashMap<_, _>>();
4755
4756    Some(Blame {
4757        entries,
4758        messages,
4759        remote_url: response.remote_url,
4760    })
4761}
4762
4763fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
4764    proto::Branch {
4765        is_head: branch.is_head,
4766        ref_name: branch.ref_name.to_string(),
4767        unix_timestamp: branch
4768            .most_recent_commit
4769            .as_ref()
4770            .map(|commit| commit.commit_timestamp as u64),
4771        upstream: branch.upstream.as_ref().map(|upstream| proto::GitUpstream {
4772            ref_name: upstream.ref_name.to_string(),
4773            tracking: upstream
4774                .tracking
4775                .status()
4776                .map(|upstream| proto::UpstreamTracking {
4777                    ahead: upstream.ahead as u64,
4778                    behind: upstream.behind as u64,
4779                }),
4780        }),
4781        most_recent_commit: branch
4782            .most_recent_commit
4783            .as_ref()
4784            .map(|commit| proto::CommitSummary {
4785                sha: commit.sha.to_string(),
4786                subject: commit.subject.to_string(),
4787                commit_timestamp: commit.commit_timestamp,
4788            }),
4789    }
4790}
4791
4792fn proto_to_branch(proto: &proto::Branch) -> git::repository::Branch {
4793    git::repository::Branch {
4794        is_head: proto.is_head,
4795        ref_name: proto.ref_name.clone().into(),
4796        upstream: proto
4797            .upstream
4798            .as_ref()
4799            .map(|upstream| git::repository::Upstream {
4800                ref_name: upstream.ref_name.to_string().into(),
4801                tracking: upstream
4802                    .tracking
4803                    .as_ref()
4804                    .map(|tracking| {
4805                        git::repository::UpstreamTracking::Tracked(UpstreamTrackingStatus {
4806                            ahead: tracking.ahead as u32,
4807                            behind: tracking.behind as u32,
4808                        })
4809                    })
4810                    .unwrap_or(git::repository::UpstreamTracking::Gone),
4811            }),
4812        most_recent_commit: proto.most_recent_commit.as_ref().map(|commit| {
4813            git::repository::CommitSummary {
4814                sha: commit.sha.to_string().into(),
4815                subject: commit.subject.to_string().into(),
4816                commit_timestamp: commit.commit_timestamp,
4817                has_parent: true,
4818            }
4819        }),
4820    }
4821}
4822
4823fn commit_details_to_proto(commit: &CommitDetails) -> proto::GitCommitDetails {
4824    proto::GitCommitDetails {
4825        sha: commit.sha.to_string(),
4826        message: commit.message.to_string(),
4827        commit_timestamp: commit.commit_timestamp,
4828        author_email: commit.author_email.to_string(),
4829        author_name: commit.author_name.to_string(),
4830    }
4831}
4832
4833fn proto_to_commit_details(proto: &proto::GitCommitDetails) -> CommitDetails {
4834    CommitDetails {
4835        sha: proto.sha.clone().into(),
4836        message: proto.message.clone().into(),
4837        commit_timestamp: proto.commit_timestamp,
4838        author_email: proto.author_email.clone().into(),
4839        author_name: proto.author_name.clone().into(),
4840    }
4841}
4842
4843async fn compute_snapshot(
4844    id: RepositoryId,
4845    work_directory_abs_path: Arc<Path>,
4846    prev_snapshot: RepositorySnapshot,
4847    backend: Arc<dyn GitRepository>,
4848) -> Result<(RepositorySnapshot, Vec<RepositoryEvent>)> {
4849    let mut events = Vec::new();
4850    let branches = backend.branches().await?;
4851    let branch = branches.into_iter().find(|branch| branch.is_head);
4852    let statuses = backend
4853        .status(std::slice::from_ref(&WORK_DIRECTORY_REPO_PATH))
4854        .await?;
4855    let statuses_by_path = SumTree::from_iter(
4856        statuses
4857            .entries
4858            .iter()
4859            .map(|(repo_path, status)| StatusEntry {
4860                repo_path: repo_path.clone(),
4861                status: *status,
4862            }),
4863        &(),
4864    );
4865    let (merge_details, merge_heads_changed) =
4866        MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?;
4867    log::debug!("new merge details (changed={merge_heads_changed:?}): {merge_details:?}");
4868
4869    if merge_heads_changed
4870        || branch != prev_snapshot.branch
4871        || statuses_by_path != prev_snapshot.statuses_by_path
4872    {
4873        events.push(RepositoryEvent::Updated {
4874            full_scan: true,
4875            new_instance: false,
4876        });
4877    }
4878
4879    // Cache merge conflict paths so they don't change from staging/unstaging,
4880    // until the merge heads change (at commit time, etc.).
4881    if merge_heads_changed {
4882        events.push(RepositoryEvent::MergeHeadsChanged);
4883    }
4884
4885    // Useful when branch is None in detached head state
4886    let head_commit = match backend.head_sha().await {
4887        Some(head_sha) => backend.show(head_sha).await.log_err(),
4888        None => None,
4889    };
4890
4891    // Used by edit prediction data collection
4892    let remote_origin_url = backend.remote_url("origin");
4893    let remote_upstream_url = backend.remote_url("upstream");
4894
4895    let snapshot = RepositorySnapshot {
4896        id,
4897        statuses_by_path,
4898        work_directory_abs_path,
4899        scan_id: prev_snapshot.scan_id + 1,
4900        branch,
4901        head_commit,
4902        merge: merge_details,
4903        remote_origin_url,
4904        remote_upstream_url,
4905    };
4906
4907    Ok((snapshot, events))
4908}
4909
4910fn status_from_proto(
4911    simple_status: i32,
4912    status: Option<proto::GitFileStatus>,
4913) -> anyhow::Result<FileStatus> {
4914    use proto::git_file_status::Variant;
4915
4916    let Some(variant) = status.and_then(|status| status.variant) else {
4917        let code = proto::GitStatus::from_i32(simple_status)
4918            .with_context(|| format!("Invalid git status code: {simple_status}"))?;
4919        let result = match code {
4920            proto::GitStatus::Added => TrackedStatus {
4921                worktree_status: StatusCode::Added,
4922                index_status: StatusCode::Unmodified,
4923            }
4924            .into(),
4925            proto::GitStatus::Modified => TrackedStatus {
4926                worktree_status: StatusCode::Modified,
4927                index_status: StatusCode::Unmodified,
4928            }
4929            .into(),
4930            proto::GitStatus::Conflict => UnmergedStatus {
4931                first_head: UnmergedStatusCode::Updated,
4932                second_head: UnmergedStatusCode::Updated,
4933            }
4934            .into(),
4935            proto::GitStatus::Deleted => TrackedStatus {
4936                worktree_status: StatusCode::Deleted,
4937                index_status: StatusCode::Unmodified,
4938            }
4939            .into(),
4940            _ => anyhow::bail!("Invalid code for simple status: {simple_status}"),
4941        };
4942        return Ok(result);
4943    };
4944
4945    let result = match variant {
4946        Variant::Untracked(_) => FileStatus::Untracked,
4947        Variant::Ignored(_) => FileStatus::Ignored,
4948        Variant::Unmerged(unmerged) => {
4949            let [first_head, second_head] =
4950                [unmerged.first_head, unmerged.second_head].map(|head| {
4951                    let code = proto::GitStatus::from_i32(head)
4952                        .with_context(|| format!("Invalid git status code: {head}"))?;
4953                    let result = match code {
4954                        proto::GitStatus::Added => UnmergedStatusCode::Added,
4955                        proto::GitStatus::Updated => UnmergedStatusCode::Updated,
4956                        proto::GitStatus::Deleted => UnmergedStatusCode::Deleted,
4957                        _ => anyhow::bail!("Invalid code for unmerged status: {code:?}"),
4958                    };
4959                    Ok(result)
4960                });
4961            let [first_head, second_head] = [first_head?, second_head?];
4962            UnmergedStatus {
4963                first_head,
4964                second_head,
4965            }
4966            .into()
4967        }
4968        Variant::Tracked(tracked) => {
4969            let [index_status, worktree_status] = [tracked.index_status, tracked.worktree_status]
4970                .map(|status| {
4971                    let code = proto::GitStatus::from_i32(status)
4972                        .with_context(|| format!("Invalid git status code: {status}"))?;
4973                    let result = match code {
4974                        proto::GitStatus::Modified => StatusCode::Modified,
4975                        proto::GitStatus::TypeChanged => StatusCode::TypeChanged,
4976                        proto::GitStatus::Added => StatusCode::Added,
4977                        proto::GitStatus::Deleted => StatusCode::Deleted,
4978                        proto::GitStatus::Renamed => StatusCode::Renamed,
4979                        proto::GitStatus::Copied => StatusCode::Copied,
4980                        proto::GitStatus::Unmodified => StatusCode::Unmodified,
4981                        _ => anyhow::bail!("Invalid code for tracked status: {code:?}"),
4982                    };
4983                    Ok(result)
4984                });
4985            let [index_status, worktree_status] = [index_status?, worktree_status?];
4986            TrackedStatus {
4987                index_status,
4988                worktree_status,
4989            }
4990            .into()
4991        }
4992    };
4993    Ok(result)
4994}
4995
4996fn status_to_proto(status: FileStatus) -> proto::GitFileStatus {
4997    use proto::git_file_status::{Tracked, Unmerged, Variant};
4998
4999    let variant = match status {
5000        FileStatus::Untracked => Variant::Untracked(Default::default()),
5001        FileStatus::Ignored => Variant::Ignored(Default::default()),
5002        FileStatus::Unmerged(UnmergedStatus {
5003            first_head,
5004            second_head,
5005        }) => Variant::Unmerged(Unmerged {
5006            first_head: unmerged_status_to_proto(first_head),
5007            second_head: unmerged_status_to_proto(second_head),
5008        }),
5009        FileStatus::Tracked(TrackedStatus {
5010            index_status,
5011            worktree_status,
5012        }) => Variant::Tracked(Tracked {
5013            index_status: tracked_status_to_proto(index_status),
5014            worktree_status: tracked_status_to_proto(worktree_status),
5015        }),
5016    };
5017    proto::GitFileStatus {
5018        variant: Some(variant),
5019    }
5020}
5021
5022fn unmerged_status_to_proto(code: UnmergedStatusCode) -> i32 {
5023    match code {
5024        UnmergedStatusCode::Added => proto::GitStatus::Added as _,
5025        UnmergedStatusCode::Deleted => proto::GitStatus::Deleted as _,
5026        UnmergedStatusCode::Updated => proto::GitStatus::Updated as _,
5027    }
5028}
5029
5030fn tracked_status_to_proto(code: StatusCode) -> i32 {
5031    match code {
5032        StatusCode::Added => proto::GitStatus::Added as _,
5033        StatusCode::Deleted => proto::GitStatus::Deleted as _,
5034        StatusCode::Modified => proto::GitStatus::Modified as _,
5035        StatusCode::Renamed => proto::GitStatus::Renamed as _,
5036        StatusCode::TypeChanged => proto::GitStatus::TypeChanged as _,
5037        StatusCode::Copied => proto::GitStatus::Copied as _,
5038        StatusCode::Unmodified => proto::GitStatus::Unmodified as _,
5039    }
5040}