git_store.rs

   1mod conflict_set;
   2pub mod git_traversal;
   3
   4use crate::{
   5    ProjectEnvironment, ProjectItem, ProjectPath,
   6    buffer_store::{BufferStore, BufferStoreEvent},
   7    worktree_store::{WorktreeStore, WorktreeStoreEvent},
   8};
   9use anyhow::{Context as _, Result, anyhow, bail};
  10use askpass::AskPassDelegate;
  11use buffer_diff::{BufferDiff, BufferDiffEvent};
  12use client::ProjectId;
  13use collections::HashMap;
  14pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, ConflictSetUpdate};
  15use fs::Fs;
  16use futures::{
  17    FutureExt, StreamExt as _,
  18    channel::{mpsc, oneshot},
  19    future::{self, Shared, try_join_all},
  20};
  21use git::{
  22    BuildPermalinkParams, GitHostingProviderRegistry, WORK_DIRECTORY_REPO_PATH,
  23    blame::Blame,
  24    parse_git_remote_url,
  25    repository::{
  26        Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, DiffType, GitRepository,
  27        GitRepositoryCheckpoint, PushOptions, Remote, RemoteCommandOutput, RepoPath, ResetMode,
  28        UpstreamTrackingStatus,
  29    },
  30    status::{
  31        FileStatus, GitSummary, StatusCode, TrackedStatus, UnmergedStatus, UnmergedStatusCode,
  32    },
  33};
  34use gpui::{
  35    App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
  36    WeakEntity,
  37};
  38use language::{
  39    Buffer, BufferEvent, Language, LanguageRegistry,
  40    proto::{deserialize_version, serialize_version},
  41};
  42use parking_lot::Mutex;
  43use postage::stream::Stream as _;
  44use rpc::{
  45    AnyProtoClient, TypedEnvelope,
  46    proto::{self, FromProto, SSH_PROJECT_ID, ToProto, git_reset, split_repository_update},
  47};
  48use serde::Deserialize;
  49use std::{
  50    cmp::Ordering,
  51    collections::{BTreeSet, VecDeque},
  52    future::Future,
  53    mem,
  54    ops::Range,
  55    path::{Path, PathBuf},
  56    sync::{
  57        Arc,
  58        atomic::{self, AtomicU64},
  59    },
  60    time::Instant,
  61};
  62use sum_tree::{Edit, SumTree, TreeSet};
  63use text::{Bias, BufferId};
  64use util::{ResultExt, debug_panic, post_inc};
  65use worktree::{
  66    File, PathKey, PathProgress, PathSummary, PathTarget, UpdatedGitRepositoriesSet,
  67    UpdatedGitRepository, Worktree,
  68};
  69
  70pub struct GitStore {
  71    state: GitStoreState,
  72    buffer_store: Entity<BufferStore>,
  73    worktree_store: Entity<WorktreeStore>,
  74    repositories: HashMap<RepositoryId, Entity<Repository>>,
  75    active_repo_id: Option<RepositoryId>,
  76    #[allow(clippy::type_complexity)]
  77    loading_diffs:
  78        HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
  79    diffs: HashMap<BufferId, Entity<BufferGitState>>,
  80    shared_diffs: HashMap<proto::PeerId, HashMap<BufferId, SharedDiffs>>,
  81    _subscriptions: Vec<Subscription>,
  82}
  83
  84#[derive(Default)]
  85struct SharedDiffs {
  86    unstaged: Option<Entity<BufferDiff>>,
  87    uncommitted: Option<Entity<BufferDiff>>,
  88}
  89
  90struct BufferGitState {
  91    unstaged_diff: Option<WeakEntity<BufferDiff>>,
  92    uncommitted_diff: Option<WeakEntity<BufferDiff>>,
  93    conflict_set: Option<WeakEntity<ConflictSet>>,
  94    recalculate_diff_task: Option<Task<Result<()>>>,
  95    reparse_conflict_markers_task: Option<Task<Result<()>>>,
  96    language: Option<Arc<Language>>,
  97    language_registry: Option<Arc<LanguageRegistry>>,
  98    conflict_updated_futures: Vec<oneshot::Sender<()>>,
  99    recalculating_tx: postage::watch::Sender<bool>,
 100
 101    /// These operation counts are used to ensure that head and index text
 102    /// values read from the git repository are up-to-date with any hunk staging
 103    /// operations that have been performed on the BufferDiff.
 104    ///
 105    /// The operation count is incremented immediately when the user initiates a
 106    /// hunk stage/unstage operation. Then, upon finishing writing the new index
 107    /// text do disk, the `operation count as of write` is updated to reflect
 108    /// the operation count that prompted the write.
 109    hunk_staging_operation_count: usize,
 110    hunk_staging_operation_count_as_of_write: usize,
 111
 112    head_text: Option<Arc<String>>,
 113    index_text: Option<Arc<String>>,
 114    head_changed: bool,
 115    index_changed: bool,
 116    language_changed: bool,
 117}
 118
 119#[derive(Clone, Debug)]
 120enum DiffBasesChange {
 121    SetIndex(Option<String>),
 122    SetHead(Option<String>),
 123    SetEach {
 124        index: Option<String>,
 125        head: Option<String>,
 126    },
 127    SetBoth(Option<String>),
 128}
 129
 130#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
 131enum DiffKind {
 132    Unstaged,
 133    Uncommitted,
 134}
 135
 136enum GitStoreState {
 137    Local {
 138        next_repository_id: Arc<AtomicU64>,
 139        downstream: Option<LocalDownstreamState>,
 140        project_environment: Entity<ProjectEnvironment>,
 141        fs: Arc<dyn Fs>,
 142    },
 143    Ssh {
 144        upstream_client: AnyProtoClient,
 145        upstream_project_id: ProjectId,
 146        downstream: Option<(AnyProtoClient, ProjectId)>,
 147    },
 148    Remote {
 149        upstream_client: AnyProtoClient,
 150        upstream_project_id: ProjectId,
 151    },
 152}
 153
 154enum DownstreamUpdate {
 155    UpdateRepository(RepositorySnapshot),
 156    RemoveRepository(RepositoryId),
 157}
 158
 159struct LocalDownstreamState {
 160    client: AnyProtoClient,
 161    project_id: ProjectId,
 162    updates_tx: mpsc::UnboundedSender<DownstreamUpdate>,
 163    _task: Task<Result<()>>,
 164}
 165
 166#[derive(Clone)]
 167pub struct GitStoreCheckpoint {
 168    checkpoints_by_work_dir_abs_path: HashMap<Arc<Path>, GitRepositoryCheckpoint>,
 169}
 170
 171#[derive(Clone, Debug, PartialEq, Eq)]
 172pub struct StatusEntry {
 173    pub repo_path: RepoPath,
 174    pub status: FileStatus,
 175}
 176
 177impl StatusEntry {
 178    fn to_proto(&self) -> proto::StatusEntry {
 179        let simple_status = match self.status {
 180            FileStatus::Ignored | FileStatus::Untracked => proto::GitStatus::Added as i32,
 181            FileStatus::Unmerged { .. } => proto::GitStatus::Conflict as i32,
 182            FileStatus::Tracked(TrackedStatus {
 183                index_status,
 184                worktree_status,
 185            }) => tracked_status_to_proto(if worktree_status != StatusCode::Unmodified {
 186                worktree_status
 187            } else {
 188                index_status
 189            }),
 190        };
 191
 192        proto::StatusEntry {
 193            repo_path: self.repo_path.as_ref().to_proto(),
 194            simple_status,
 195            status: Some(status_to_proto(self.status)),
 196        }
 197    }
 198}
 199
 200impl TryFrom<proto::StatusEntry> for StatusEntry {
 201    type Error = anyhow::Error;
 202
 203    fn try_from(value: proto::StatusEntry) -> Result<Self, Self::Error> {
 204        let repo_path = RepoPath(Arc::<Path>::from_proto(value.repo_path));
 205        let status = status_from_proto(value.simple_status, value.status)?;
 206        Ok(Self { repo_path, status })
 207    }
 208}
 209
 210impl sum_tree::Item for StatusEntry {
 211    type Summary = PathSummary<GitSummary>;
 212
 213    fn summary(&self, _: &<Self::Summary as sum_tree::Summary>::Context) -> Self::Summary {
 214        PathSummary {
 215            max_path: self.repo_path.0.clone(),
 216            item_summary: self.status.summary(),
 217        }
 218    }
 219}
 220
 221impl sum_tree::KeyedItem for StatusEntry {
 222    type Key = PathKey;
 223
 224    fn key(&self) -> Self::Key {
 225        PathKey(self.repo_path.0.clone())
 226    }
 227}
 228
 229#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
 230pub struct RepositoryId(pub u64);
 231
 232#[derive(Clone, Debug, Default, PartialEq, Eq)]
 233pub struct MergeDetails {
 234    pub conflicted_paths: TreeSet<RepoPath>,
 235    pub message: Option<SharedString>,
 236    pub apply_head: Option<CommitDetails>,
 237    pub cherry_pick_head: Option<CommitDetails>,
 238    pub merge_heads: Vec<CommitDetails>,
 239    pub rebase_head: Option<CommitDetails>,
 240    pub revert_head: Option<CommitDetails>,
 241}
 242
 243#[derive(Clone, Debug, PartialEq, Eq)]
 244pub struct RepositorySnapshot {
 245    pub id: RepositoryId,
 246    pub statuses_by_path: SumTree<StatusEntry>,
 247    pub work_directory_abs_path: Arc<Path>,
 248    pub branch: Option<Branch>,
 249    pub head_commit: Option<CommitDetails>,
 250    pub scan_id: u64,
 251    pub merge: MergeDetails,
 252}
 253
 254type JobId = u64;
 255
 256#[derive(Clone, Debug, PartialEq, Eq)]
 257pub struct JobInfo {
 258    pub start: Instant,
 259    pub message: SharedString,
 260}
 261
 262pub struct Repository {
 263    this: WeakEntity<Self>,
 264    snapshot: RepositorySnapshot,
 265    commit_message_buffer: Option<Entity<Buffer>>,
 266    git_store: WeakEntity<GitStore>,
 267    // For a local repository, holds paths that have had worktree events since the last status scan completed,
 268    // and that should be examined during the next status scan.
 269    paths_needing_status_update: BTreeSet<RepoPath>,
 270    job_sender: mpsc::UnboundedSender<GitJob>,
 271    active_jobs: HashMap<JobId, JobInfo>,
 272    job_id: JobId,
 273    askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
 274    latest_askpass_id: u64,
 275}
 276
 277impl std::ops::Deref for Repository {
 278    type Target = RepositorySnapshot;
 279
 280    fn deref(&self) -> &Self::Target {
 281        &self.snapshot
 282    }
 283}
 284
 285#[derive(Clone)]
 286pub enum RepositoryState {
 287    Local {
 288        backend: Arc<dyn GitRepository>,
 289        environment: Arc<HashMap<String, String>>,
 290    },
 291    Remote {
 292        project_id: ProjectId,
 293        client: AnyProtoClient,
 294    },
 295}
 296
 297#[derive(Clone, Debug)]
 298pub enum RepositoryEvent {
 299    Updated { full_scan: bool },
 300    MergeHeadsChanged,
 301}
 302
 303#[derive(Clone, Debug)]
 304pub struct JobsUpdated;
 305
 306#[derive(Debug)]
 307pub enum GitStoreEvent {
 308    ActiveRepositoryChanged(Option<RepositoryId>),
 309    RepositoryUpdated(RepositoryId, RepositoryEvent, bool),
 310    RepositoryAdded(RepositoryId),
 311    RepositoryRemoved(RepositoryId),
 312    IndexWriteError(anyhow::Error),
 313    JobsUpdated,
 314    ConflictsUpdated,
 315}
 316
 317impl EventEmitter<RepositoryEvent> for Repository {}
 318impl EventEmitter<JobsUpdated> for Repository {}
 319impl EventEmitter<GitStoreEvent> for GitStore {}
 320
 321pub struct GitJob {
 322    job: Box<dyn FnOnce(RepositoryState, &mut AsyncApp) -> Task<()>>,
 323    key: Option<GitJobKey>,
 324}
 325
 326#[derive(PartialEq, Eq)]
 327enum GitJobKey {
 328    WriteIndex(RepoPath),
 329    ReloadBufferDiffBases,
 330    RefreshStatuses,
 331    ReloadGitState,
 332}
 333
 334impl GitStore {
 335    pub fn local(
 336        worktree_store: &Entity<WorktreeStore>,
 337        buffer_store: Entity<BufferStore>,
 338        environment: Entity<ProjectEnvironment>,
 339        fs: Arc<dyn Fs>,
 340        cx: &mut Context<Self>,
 341    ) -> Self {
 342        Self::new(
 343            worktree_store.clone(),
 344            buffer_store,
 345            GitStoreState::Local {
 346                next_repository_id: Arc::new(AtomicU64::new(1)),
 347                downstream: None,
 348                project_environment: environment,
 349                fs,
 350            },
 351            cx,
 352        )
 353    }
 354
 355    pub fn remote(
 356        worktree_store: &Entity<WorktreeStore>,
 357        buffer_store: Entity<BufferStore>,
 358        upstream_client: AnyProtoClient,
 359        project_id: ProjectId,
 360        cx: &mut Context<Self>,
 361    ) -> Self {
 362        Self::new(
 363            worktree_store.clone(),
 364            buffer_store,
 365            GitStoreState::Remote {
 366                upstream_client,
 367                upstream_project_id: project_id,
 368            },
 369            cx,
 370        )
 371    }
 372
 373    pub fn ssh(
 374        worktree_store: &Entity<WorktreeStore>,
 375        buffer_store: Entity<BufferStore>,
 376        upstream_client: AnyProtoClient,
 377        cx: &mut Context<Self>,
 378    ) -> Self {
 379        Self::new(
 380            worktree_store.clone(),
 381            buffer_store,
 382            GitStoreState::Ssh {
 383                upstream_client,
 384                upstream_project_id: ProjectId(SSH_PROJECT_ID),
 385                downstream: None,
 386            },
 387            cx,
 388        )
 389    }
 390
 391    fn new(
 392        worktree_store: Entity<WorktreeStore>,
 393        buffer_store: Entity<BufferStore>,
 394        state: GitStoreState,
 395        cx: &mut Context<Self>,
 396    ) -> Self {
 397        let _subscriptions = vec![
 398            cx.subscribe(&worktree_store, Self::on_worktree_store_event),
 399            cx.subscribe(&buffer_store, Self::on_buffer_store_event),
 400        ];
 401
 402        GitStore {
 403            state,
 404            buffer_store,
 405            worktree_store,
 406            repositories: HashMap::default(),
 407            active_repo_id: None,
 408            _subscriptions,
 409            loading_diffs: HashMap::default(),
 410            shared_diffs: HashMap::default(),
 411            diffs: HashMap::default(),
 412        }
 413    }
 414
 415    pub fn init(client: &AnyProtoClient) {
 416        client.add_entity_request_handler(Self::handle_get_remotes);
 417        client.add_entity_request_handler(Self::handle_get_branches);
 418        client.add_entity_request_handler(Self::handle_change_branch);
 419        client.add_entity_request_handler(Self::handle_create_branch);
 420        client.add_entity_request_handler(Self::handle_git_init);
 421        client.add_entity_request_handler(Self::handle_push);
 422        client.add_entity_request_handler(Self::handle_pull);
 423        client.add_entity_request_handler(Self::handle_fetch);
 424        client.add_entity_request_handler(Self::handle_stage);
 425        client.add_entity_request_handler(Self::handle_unstage);
 426        client.add_entity_request_handler(Self::handle_commit);
 427        client.add_entity_request_handler(Self::handle_reset);
 428        client.add_entity_request_handler(Self::handle_show);
 429        client.add_entity_request_handler(Self::handle_load_commit_diff);
 430        client.add_entity_request_handler(Self::handle_checkout_files);
 431        client.add_entity_request_handler(Self::handle_open_commit_message_buffer);
 432        client.add_entity_request_handler(Self::handle_set_index_text);
 433        client.add_entity_request_handler(Self::handle_askpass);
 434        client.add_entity_request_handler(Self::handle_check_for_pushed_commits);
 435        client.add_entity_request_handler(Self::handle_git_diff);
 436        client.add_entity_request_handler(Self::handle_open_unstaged_diff);
 437        client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
 438        client.add_entity_message_handler(Self::handle_update_diff_bases);
 439        client.add_entity_request_handler(Self::handle_get_permalink_to_line);
 440        client.add_entity_request_handler(Self::handle_blame_buffer);
 441        client.add_entity_message_handler(Self::handle_update_repository);
 442        client.add_entity_message_handler(Self::handle_remove_repository);
 443    }
 444
 445    pub fn is_local(&self) -> bool {
 446        matches!(self.state, GitStoreState::Local { .. })
 447    }
 448
 449    pub fn shared(&mut self, project_id: u64, client: AnyProtoClient, cx: &mut Context<Self>) {
 450        match &mut self.state {
 451            GitStoreState::Ssh {
 452                downstream: downstream_client,
 453                ..
 454            } => {
 455                for repo in self.repositories.values() {
 456                    let update = repo.read(cx).snapshot.initial_update(project_id);
 457                    for update in split_repository_update(update) {
 458                        client.send(update).log_err();
 459                    }
 460                }
 461                *downstream_client = Some((client, ProjectId(project_id)));
 462            }
 463            GitStoreState::Local {
 464                downstream: downstream_client,
 465                ..
 466            } => {
 467                let mut snapshots = HashMap::default();
 468                let (updates_tx, mut updates_rx) = mpsc::unbounded();
 469                for repo in self.repositories.values() {
 470                    updates_tx
 471                        .unbounded_send(DownstreamUpdate::UpdateRepository(
 472                            repo.read(cx).snapshot.clone(),
 473                        ))
 474                        .ok();
 475                }
 476                *downstream_client = Some(LocalDownstreamState {
 477                    client: client.clone(),
 478                    project_id: ProjectId(project_id),
 479                    updates_tx,
 480                    _task: cx.spawn(async move |this, cx| {
 481                        cx.background_spawn(async move {
 482                            while let Some(update) = updates_rx.next().await {
 483                                match update {
 484                                    DownstreamUpdate::UpdateRepository(snapshot) => {
 485                                        if let Some(old_snapshot) = snapshots.get_mut(&snapshot.id)
 486                                        {
 487                                            let update =
 488                                                snapshot.build_update(old_snapshot, project_id);
 489                                            *old_snapshot = snapshot;
 490                                            for update in split_repository_update(update) {
 491                                                client.send(update)?;
 492                                            }
 493                                        } else {
 494                                            let update = snapshot.initial_update(project_id);
 495                                            for update in split_repository_update(update) {
 496                                                client.send(update)?;
 497                                            }
 498                                            snapshots.insert(snapshot.id, snapshot);
 499                                        }
 500                                    }
 501                                    DownstreamUpdate::RemoveRepository(id) => {
 502                                        client.send(proto::RemoveRepository {
 503                                            project_id,
 504                                            id: id.to_proto(),
 505                                        })?;
 506                                    }
 507                                }
 508                            }
 509                            anyhow::Ok(())
 510                        })
 511                        .await
 512                        .ok();
 513                        this.update(cx, |this, _| {
 514                            if let GitStoreState::Local {
 515                                downstream: downstream_client,
 516                                ..
 517                            } = &mut this.state
 518                            {
 519                                downstream_client.take();
 520                            } else {
 521                                unreachable!("unshared called on remote store");
 522                            }
 523                        })
 524                    }),
 525                });
 526            }
 527            GitStoreState::Remote { .. } => {
 528                debug_panic!("shared called on remote store");
 529            }
 530        }
 531    }
 532
 533    pub fn unshared(&mut self, _cx: &mut Context<Self>) {
 534        match &mut self.state {
 535            GitStoreState::Local {
 536                downstream: downstream_client,
 537                ..
 538            } => {
 539                downstream_client.take();
 540            }
 541            GitStoreState::Ssh {
 542                downstream: downstream_client,
 543                ..
 544            } => {
 545                downstream_client.take();
 546            }
 547            GitStoreState::Remote { .. } => {
 548                debug_panic!("unshared called on remote store");
 549            }
 550        }
 551        self.shared_diffs.clear();
 552    }
 553
 554    pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) {
 555        self.shared_diffs.remove(peer_id);
 556    }
 557
 558    pub fn active_repository(&self) -> Option<Entity<Repository>> {
 559        self.active_repo_id
 560            .as_ref()
 561            .map(|id| self.repositories[&id].clone())
 562    }
 563
 564    pub fn open_unstaged_diff(
 565        &mut self,
 566        buffer: Entity<Buffer>,
 567        cx: &mut Context<Self>,
 568    ) -> Task<Result<Entity<BufferDiff>>> {
 569        let buffer_id = buffer.read(cx).remote_id();
 570        if let Some(diff_state) = self.diffs.get(&buffer_id) {
 571            if let Some(unstaged_diff) = diff_state
 572                .read(cx)
 573                .unstaged_diff
 574                .as_ref()
 575                .and_then(|weak| weak.upgrade())
 576            {
 577                if let Some(task) =
 578                    diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
 579                {
 580                    return cx.background_executor().spawn(async move {
 581                        task.await;
 582                        Ok(unstaged_diff)
 583                    });
 584                }
 585                return Task::ready(Ok(unstaged_diff));
 586            }
 587        }
 588
 589        let Some((repo, repo_path)) =
 590            self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
 591        else {
 592            return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
 593        };
 594
 595        let task = self
 596            .loading_diffs
 597            .entry((buffer_id, DiffKind::Unstaged))
 598            .or_insert_with(|| {
 599                let staged_text = repo.update(cx, |repo, cx| {
 600                    repo.load_staged_text(buffer_id, repo_path, cx)
 601                });
 602                cx.spawn(async move |this, cx| {
 603                    Self::open_diff_internal(
 604                        this,
 605                        DiffKind::Unstaged,
 606                        staged_text.await.map(DiffBasesChange::SetIndex),
 607                        buffer,
 608                        cx,
 609                    )
 610                    .await
 611                    .map_err(Arc::new)
 612                })
 613                .shared()
 614            })
 615            .clone();
 616
 617        cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
 618    }
 619
 620    pub fn open_uncommitted_diff(
 621        &mut self,
 622        buffer: Entity<Buffer>,
 623        cx: &mut Context<Self>,
 624    ) -> Task<Result<Entity<BufferDiff>>> {
 625        let buffer_id = buffer.read(cx).remote_id();
 626
 627        if let Some(diff_state) = self.diffs.get(&buffer_id) {
 628            if let Some(uncommitted_diff) = diff_state
 629                .read(cx)
 630                .uncommitted_diff
 631                .as_ref()
 632                .and_then(|weak| weak.upgrade())
 633            {
 634                if let Some(task) =
 635                    diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
 636                {
 637                    return cx.background_executor().spawn(async move {
 638                        task.await;
 639                        Ok(uncommitted_diff)
 640                    });
 641                }
 642                return Task::ready(Ok(uncommitted_diff));
 643            }
 644        }
 645
 646        let Some((repo, repo_path)) =
 647            self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
 648        else {
 649            return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
 650        };
 651
 652        let task = self
 653            .loading_diffs
 654            .entry((buffer_id, DiffKind::Uncommitted))
 655            .or_insert_with(|| {
 656                let changes = repo.update(cx, |repo, cx| {
 657                    repo.load_committed_text(buffer_id, repo_path, cx)
 658                });
 659
 660                cx.spawn(async move |this, cx| {
 661                    Self::open_diff_internal(this, DiffKind::Uncommitted, changes.await, buffer, cx)
 662                        .await
 663                        .map_err(Arc::new)
 664                })
 665                .shared()
 666            })
 667            .clone();
 668
 669        cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
 670    }
 671
 672    async fn open_diff_internal(
 673        this: WeakEntity<Self>,
 674        kind: DiffKind,
 675        texts: Result<DiffBasesChange>,
 676        buffer_entity: Entity<Buffer>,
 677        cx: &mut AsyncApp,
 678    ) -> Result<Entity<BufferDiff>> {
 679        let diff_bases_change = match texts {
 680            Err(e) => {
 681                this.update(cx, |this, cx| {
 682                    let buffer = buffer_entity.read(cx);
 683                    let buffer_id = buffer.remote_id();
 684                    this.loading_diffs.remove(&(buffer_id, kind));
 685                })?;
 686                return Err(e);
 687            }
 688            Ok(change) => change,
 689        };
 690
 691        this.update(cx, |this, cx| {
 692            let buffer = buffer_entity.read(cx);
 693            let buffer_id = buffer.remote_id();
 694            let language = buffer.language().cloned();
 695            let language_registry = buffer.language_registry();
 696            let text_snapshot = buffer.text_snapshot();
 697            this.loading_diffs.remove(&(buffer_id, kind));
 698
 699            let git_store = cx.weak_entity();
 700            let diff_state = this
 701                .diffs
 702                .entry(buffer_id)
 703                .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
 704
 705            let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
 706
 707            cx.subscribe(&diff, Self::on_buffer_diff_event).detach();
 708            diff_state.update(cx, |diff_state, cx| {
 709                diff_state.language = language;
 710                diff_state.language_registry = language_registry;
 711
 712                match kind {
 713                    DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
 714                    DiffKind::Uncommitted => {
 715                        let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
 716                            diff
 717                        } else {
 718                            let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
 719                            diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
 720                            unstaged_diff
 721                        };
 722
 723                        diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
 724                        diff_state.uncommitted_diff = Some(diff.downgrade())
 725                    }
 726                }
 727
 728                diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
 729                let rx = diff_state.wait_for_recalculation();
 730
 731                anyhow::Ok(async move {
 732                    if let Some(rx) = rx {
 733                        rx.await;
 734                    }
 735                    Ok(diff)
 736                })
 737            })
 738        })??
 739        .await
 740    }
 741
 742    pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
 743        let diff_state = self.diffs.get(&buffer_id)?;
 744        diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
 745    }
 746
 747    pub fn get_uncommitted_diff(
 748        &self,
 749        buffer_id: BufferId,
 750        cx: &App,
 751    ) -> Option<Entity<BufferDiff>> {
 752        let diff_state = self.diffs.get(&buffer_id)?;
 753        diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
 754    }
 755
 756    pub fn open_conflict_set(
 757        &mut self,
 758        buffer: Entity<Buffer>,
 759        cx: &mut Context<Self>,
 760    ) -> Entity<ConflictSet> {
 761        log::debug!("open conflict set");
 762        let buffer_id = buffer.read(cx).remote_id();
 763
 764        if let Some(git_state) = self.diffs.get(&buffer_id) {
 765            if let Some(conflict_set) = git_state
 766                .read(cx)
 767                .conflict_set
 768                .as_ref()
 769                .and_then(|weak| weak.upgrade())
 770            {
 771                let conflict_set = conflict_set.clone();
 772                let buffer_snapshot = buffer.read(cx).text_snapshot();
 773
 774                git_state.update(cx, |state, cx| {
 775                    let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
 776                });
 777
 778                return conflict_set;
 779            }
 780        }
 781
 782        let is_unmerged = self
 783            .repository_and_path_for_buffer_id(buffer_id, cx)
 784            .map_or(false, |(repo, path)| {
 785                repo.read(cx)
 786                    .snapshot
 787                    .merge
 788                    .conflicted_paths
 789                    .contains(&path)
 790            });
 791        let git_store = cx.weak_entity();
 792        let buffer_git_state = self
 793            .diffs
 794            .entry(buffer_id)
 795            .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
 796        let conflict_set = cx.new(|cx| ConflictSet::new(buffer_id, is_unmerged, cx));
 797
 798        self._subscriptions
 799            .push(cx.subscribe(&conflict_set, |_, _, _, cx| {
 800                cx.emit(GitStoreEvent::ConflictsUpdated);
 801            }));
 802
 803        buffer_git_state.update(cx, |state, cx| {
 804            state.conflict_set = Some(conflict_set.downgrade());
 805            let buffer_snapshot = buffer.read(cx).text_snapshot();
 806            let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
 807        });
 808
 809        conflict_set
 810    }
 811
 812    pub fn project_path_git_status(
 813        &self,
 814        project_path: &ProjectPath,
 815        cx: &App,
 816    ) -> Option<FileStatus> {
 817        let (repo, repo_path) = self.repository_and_path_for_project_path(project_path, cx)?;
 818        Some(repo.read(cx).status_for_path(&repo_path)?.status)
 819    }
 820
 821    pub fn checkpoint(&self, cx: &mut App) -> Task<Result<GitStoreCheckpoint>> {
 822        let mut work_directory_abs_paths = Vec::new();
 823        let mut checkpoints = Vec::new();
 824        for repository in self.repositories.values() {
 825            repository.update(cx, |repository, _| {
 826                work_directory_abs_paths.push(repository.snapshot.work_directory_abs_path.clone());
 827                checkpoints.push(repository.checkpoint().map(|checkpoint| checkpoint?));
 828            });
 829        }
 830
 831        cx.background_executor().spawn(async move {
 832            let checkpoints = future::try_join_all(checkpoints).await?;
 833            Ok(GitStoreCheckpoint {
 834                checkpoints_by_work_dir_abs_path: work_directory_abs_paths
 835                    .into_iter()
 836                    .zip(checkpoints)
 837                    .collect(),
 838            })
 839        })
 840    }
 841
 842    pub fn restore_checkpoint(
 843        &self,
 844        checkpoint: GitStoreCheckpoint,
 845        cx: &mut App,
 846    ) -> Task<Result<()>> {
 847        let repositories_by_work_dir_abs_path = self
 848            .repositories
 849            .values()
 850            .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
 851            .collect::<HashMap<_, _>>();
 852
 853        let mut tasks = Vec::new();
 854        for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
 855            if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path) {
 856                let restore = repository.update(cx, |repository, _| {
 857                    repository.restore_checkpoint(checkpoint)
 858                });
 859                tasks.push(async move { restore.await? });
 860            }
 861        }
 862        cx.background_spawn(async move {
 863            future::try_join_all(tasks).await?;
 864            Ok(())
 865        })
 866    }
 867
 868    /// Compares two checkpoints, returning true if they are equal.
 869    pub fn compare_checkpoints(
 870        &self,
 871        left: GitStoreCheckpoint,
 872        mut right: GitStoreCheckpoint,
 873        cx: &mut App,
 874    ) -> Task<Result<bool>> {
 875        let repositories_by_work_dir_abs_path = self
 876            .repositories
 877            .values()
 878            .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
 879            .collect::<HashMap<_, _>>();
 880
 881        let mut tasks = Vec::new();
 882        for (work_dir_abs_path, left_checkpoint) in left.checkpoints_by_work_dir_abs_path {
 883            if let Some(right_checkpoint) = right
 884                .checkpoints_by_work_dir_abs_path
 885                .remove(&work_dir_abs_path)
 886            {
 887                if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path)
 888                {
 889                    let compare = repository.update(cx, |repository, _| {
 890                        repository.compare_checkpoints(left_checkpoint, right_checkpoint)
 891                    });
 892
 893                    tasks.push(async move { compare.await? });
 894                }
 895            } else {
 896                return Task::ready(Ok(false));
 897            }
 898        }
 899        cx.background_spawn(async move {
 900            Ok(future::try_join_all(tasks)
 901                .await?
 902                .into_iter()
 903                .all(|result| result))
 904        })
 905    }
 906
 907    /// Blames a buffer.
 908    pub fn blame_buffer(
 909        &self,
 910        buffer: &Entity<Buffer>,
 911        version: Option<clock::Global>,
 912        cx: &mut App,
 913    ) -> Task<Result<Option<Blame>>> {
 914        let buffer = buffer.read(cx);
 915        let Some((repo, repo_path)) =
 916            self.repository_and_path_for_buffer_id(buffer.remote_id(), cx)
 917        else {
 918            return Task::ready(Err(anyhow!("failed to find a git repository for buffer")));
 919        };
 920        let content = match &version {
 921            Some(version) => buffer.rope_for_version(version).clone(),
 922            None => buffer.as_rope().clone(),
 923        };
 924        let version = version.unwrap_or(buffer.version());
 925        let buffer_id = buffer.remote_id();
 926
 927        let rx = repo.update(cx, |repo, _| {
 928            repo.send_job(None, move |state, _| async move {
 929                match state {
 930                    RepositoryState::Local { backend, .. } => backend
 931                        .blame(repo_path.clone(), content)
 932                        .await
 933                        .with_context(|| format!("Failed to blame {:?}", repo_path.0))
 934                        .map(Some),
 935                    RepositoryState::Remote { project_id, client } => {
 936                        let response = client
 937                            .request(proto::BlameBuffer {
 938                                project_id: project_id.to_proto(),
 939                                buffer_id: buffer_id.into(),
 940                                version: serialize_version(&version),
 941                            })
 942                            .await?;
 943                        Ok(deserialize_blame_buffer_response(response))
 944                    }
 945                }
 946            })
 947        });
 948
 949        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
 950    }
 951
 952    pub fn get_permalink_to_line(
 953        &self,
 954        buffer: &Entity<Buffer>,
 955        selection: Range<u32>,
 956        cx: &mut App,
 957    ) -> Task<Result<url::Url>> {
 958        let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
 959            return Task::ready(Err(anyhow!("buffer has no file")));
 960        };
 961
 962        let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
 963            &(file.worktree.read(cx).id(), file.path.clone()).into(),
 964            cx,
 965        ) else {
 966            // If we're not in a Git repo, check whether this is a Rust source
 967            // file in the Cargo registry (presumably opened with go-to-definition
 968            // from a normal Rust file). If so, we can put together a permalink
 969            // using crate metadata.
 970            if buffer
 971                .read(cx)
 972                .language()
 973                .is_none_or(|lang| lang.name() != "Rust".into())
 974            {
 975                return Task::ready(Err(anyhow!("no permalink available")));
 976            }
 977            let Some(file_path) = file.worktree.read(cx).absolutize(&file.path).ok() else {
 978                return Task::ready(Err(anyhow!("no permalink available")));
 979            };
 980            return cx.spawn(async move |cx| {
 981                let provider_registry = cx.update(GitHostingProviderRegistry::default_global)?;
 982                get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
 983                    .map_err(|_| anyhow!("no permalink available"))
 984            });
 985
 986            // TODO remote case
 987        };
 988
 989        let buffer_id = buffer.read(cx).remote_id();
 990        let branch = repo.read(cx).branch.clone();
 991        let remote = branch
 992            .as_ref()
 993            .and_then(|b| b.upstream.as_ref())
 994            .and_then(|b| b.remote_name())
 995            .unwrap_or("origin")
 996            .to_string();
 997
 998        let rx = repo.update(cx, |repo, _| {
 999            repo.send_job(None, move |state, cx| async move {
1000                match state {
1001                    RepositoryState::Local { backend, .. } => {
1002                        let origin_url = backend
1003                            .remote_url(&remote)
1004                            .ok_or_else(|| anyhow!("remote \"{remote}\" not found"))?;
1005
1006                        let sha = backend
1007                            .head_sha()
1008                            .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1009
1010                        let provider_registry =
1011                            cx.update(GitHostingProviderRegistry::default_global)?;
1012
1013                        let (provider, remote) =
1014                            parse_git_remote_url(provider_registry, &origin_url)
1015                                .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1016
1017                        let path = repo_path
1018                            .to_str()
1019                            .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1020
1021                        Ok(provider.build_permalink(
1022                            remote,
1023                            BuildPermalinkParams {
1024                                sha: &sha,
1025                                path,
1026                                selection: Some(selection),
1027                            },
1028                        ))
1029                    }
1030                    RepositoryState::Remote { project_id, client } => {
1031                        let response = client
1032                            .request(proto::GetPermalinkToLine {
1033                                project_id: project_id.to_proto(),
1034                                buffer_id: buffer_id.into(),
1035                                selection: Some(proto::Range {
1036                                    start: selection.start as u64,
1037                                    end: selection.end as u64,
1038                                }),
1039                            })
1040                            .await?;
1041
1042                        url::Url::parse(&response.permalink).context("failed to parse permalink")
1043                    }
1044                }
1045            })
1046        });
1047        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1048    }
1049
1050    fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> {
1051        match &self.state {
1052            GitStoreState::Local {
1053                downstream: downstream_client,
1054                ..
1055            } => downstream_client
1056                .as_ref()
1057                .map(|state| (state.client.clone(), state.project_id)),
1058            GitStoreState::Ssh {
1059                downstream: downstream_client,
1060                ..
1061            } => downstream_client.clone(),
1062            GitStoreState::Remote { .. } => None,
1063        }
1064    }
1065
1066    fn upstream_client(&self) -> Option<AnyProtoClient> {
1067        match &self.state {
1068            GitStoreState::Local { .. } => None,
1069            GitStoreState::Ssh {
1070                upstream_client, ..
1071            }
1072            | GitStoreState::Remote {
1073                upstream_client, ..
1074            } => Some(upstream_client.clone()),
1075        }
1076    }
1077
1078    fn on_worktree_store_event(
1079        &mut self,
1080        worktree_store: Entity<WorktreeStore>,
1081        event: &WorktreeStoreEvent,
1082        cx: &mut Context<Self>,
1083    ) {
1084        let GitStoreState::Local {
1085            project_environment,
1086            downstream,
1087            next_repository_id,
1088            fs,
1089        } = &self.state
1090        else {
1091            return;
1092        };
1093
1094        match event {
1095            WorktreeStoreEvent::WorktreeUpdatedEntries(worktree_id, updated_entries) => {
1096                let mut paths_by_git_repo = HashMap::<_, Vec<_>>::default();
1097                for (relative_path, _, _) in updated_entries.iter() {
1098                    let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
1099                        &(*worktree_id, relative_path.clone()).into(),
1100                        cx,
1101                    ) else {
1102                        continue;
1103                    };
1104                    paths_by_git_repo.entry(repo).or_default().push(repo_path)
1105                }
1106
1107                for (repo, paths) in paths_by_git_repo {
1108                    repo.update(cx, |repo, cx| {
1109                        repo.paths_changed(
1110                            paths,
1111                            downstream
1112                                .as_ref()
1113                                .map(|downstream| downstream.updates_tx.clone()),
1114                            cx,
1115                        );
1116                    });
1117                }
1118            }
1119            WorktreeStoreEvent::WorktreeUpdatedGitRepositories(worktree_id, changed_repos) => {
1120                let Some(worktree) = worktree_store.read(cx).worktree_for_id(*worktree_id, cx)
1121                else {
1122                    return;
1123                };
1124                if !worktree.read(cx).is_visible() {
1125                    log::debug!(
1126                        "not adding repositories for local worktree {:?} because it's not visible",
1127                        worktree.read(cx).abs_path()
1128                    );
1129                    return;
1130                }
1131                self.update_repositories_from_worktree(
1132                    project_environment.clone(),
1133                    next_repository_id.clone(),
1134                    downstream
1135                        .as_ref()
1136                        .map(|downstream| downstream.updates_tx.clone()),
1137                    changed_repos.clone(),
1138                    fs.clone(),
1139                    cx,
1140                );
1141                self.local_worktree_git_repos_changed(worktree, changed_repos, cx);
1142            }
1143            _ => {}
1144        }
1145    }
1146
1147    fn on_repository_event(
1148        &mut self,
1149        repo: Entity<Repository>,
1150        event: &RepositoryEvent,
1151        cx: &mut Context<Self>,
1152    ) {
1153        let id = repo.read(cx).id;
1154        let merge_conflicts = repo.read(cx).snapshot.merge.conflicted_paths.clone();
1155        for (buffer_id, diff) in self.diffs.iter() {
1156            if let Some((buffer_repo, repo_path)) =
1157                self.repository_and_path_for_buffer_id(*buffer_id, cx)
1158            {
1159                if buffer_repo == repo {
1160                    diff.update(cx, |diff, cx| {
1161                        if let Some(conflict_set) = &diff.conflict_set {
1162                            let conflict_status_changed =
1163                                conflict_set.update(cx, |conflict_set, cx| {
1164                                    let has_conflict = merge_conflicts.contains(&repo_path);
1165                                    conflict_set.set_has_conflict(has_conflict, cx)
1166                                })?;
1167                            if conflict_status_changed {
1168                                let buffer_store = self.buffer_store.read(cx);
1169                                if let Some(buffer) = buffer_store.get(*buffer_id) {
1170                                    let _ = diff.reparse_conflict_markers(
1171                                        buffer.read(cx).text_snapshot(),
1172                                        cx,
1173                                    );
1174                                }
1175                            }
1176                        }
1177                        anyhow::Ok(())
1178                    })
1179                    .ok();
1180                }
1181            }
1182        }
1183        cx.emit(GitStoreEvent::RepositoryUpdated(
1184            id,
1185            event.clone(),
1186            self.active_repo_id == Some(id),
1187        ))
1188    }
1189
1190    fn on_jobs_updated(&mut self, _: Entity<Repository>, _: &JobsUpdated, cx: &mut Context<Self>) {
1191        cx.emit(GitStoreEvent::JobsUpdated)
1192    }
1193
1194    /// Update our list of repositories and schedule git scans in response to a notification from a worktree,
1195    fn update_repositories_from_worktree(
1196        &mut self,
1197        project_environment: Entity<ProjectEnvironment>,
1198        next_repository_id: Arc<AtomicU64>,
1199        updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
1200        updated_git_repositories: UpdatedGitRepositoriesSet,
1201        fs: Arc<dyn Fs>,
1202        cx: &mut Context<Self>,
1203    ) {
1204        let mut removed_ids = Vec::new();
1205        for update in updated_git_repositories.iter() {
1206            if let Some((id, existing)) = self.repositories.iter().find(|(_, repo)| {
1207                let existing_work_directory_abs_path =
1208                    repo.read(cx).work_directory_abs_path.clone();
1209                Some(&existing_work_directory_abs_path)
1210                    == update.old_work_directory_abs_path.as_ref()
1211                    || Some(&existing_work_directory_abs_path)
1212                        == update.new_work_directory_abs_path.as_ref()
1213            }) {
1214                if let Some(new_work_directory_abs_path) =
1215                    update.new_work_directory_abs_path.clone()
1216                {
1217                    existing.update(cx, |existing, cx| {
1218                        existing.snapshot.work_directory_abs_path = new_work_directory_abs_path;
1219                        existing.schedule_scan(updates_tx.clone(), cx);
1220                    });
1221                } else {
1222                    removed_ids.push(*id);
1223                }
1224            } else if let UpdatedGitRepository {
1225                new_work_directory_abs_path: Some(work_directory_abs_path),
1226                dot_git_abs_path: Some(dot_git_abs_path),
1227                repository_dir_abs_path: Some(repository_dir_abs_path),
1228                common_dir_abs_path: Some(common_dir_abs_path),
1229                ..
1230            } = update
1231            {
1232                let id = RepositoryId(next_repository_id.fetch_add(1, atomic::Ordering::Release));
1233                let git_store = cx.weak_entity();
1234                let repo = cx.new(|cx| {
1235                    let mut repo = Repository::local(
1236                        id,
1237                        work_directory_abs_path.clone(),
1238                        dot_git_abs_path.clone(),
1239                        repository_dir_abs_path.clone(),
1240                        common_dir_abs_path.clone(),
1241                        project_environment.downgrade(),
1242                        fs.clone(),
1243                        git_store,
1244                        cx,
1245                    );
1246                    repo.schedule_scan(updates_tx.clone(), cx);
1247                    repo
1248                });
1249                self._subscriptions
1250                    .push(cx.subscribe(&repo, Self::on_repository_event));
1251                self._subscriptions
1252                    .push(cx.subscribe(&repo, Self::on_jobs_updated));
1253                self.repositories.insert(id, repo);
1254                cx.emit(GitStoreEvent::RepositoryAdded(id));
1255                self.active_repo_id.get_or_insert_with(|| {
1256                    cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1257                    id
1258                });
1259            }
1260        }
1261
1262        for id in removed_ids {
1263            if self.active_repo_id == Some(id) {
1264                self.active_repo_id = None;
1265                cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1266            }
1267            self.repositories.remove(&id);
1268            if let Some(updates_tx) = updates_tx.as_ref() {
1269                updates_tx
1270                    .unbounded_send(DownstreamUpdate::RemoveRepository(id))
1271                    .ok();
1272            }
1273        }
1274    }
1275
1276    fn on_buffer_store_event(
1277        &mut self,
1278        _: Entity<BufferStore>,
1279        event: &BufferStoreEvent,
1280        cx: &mut Context<Self>,
1281    ) {
1282        match event {
1283            BufferStoreEvent::BufferAdded(buffer) => {
1284                cx.subscribe(&buffer, |this, buffer, event, cx| {
1285                    if let BufferEvent::LanguageChanged = event {
1286                        let buffer_id = buffer.read(cx).remote_id();
1287                        if let Some(diff_state) = this.diffs.get(&buffer_id) {
1288                            diff_state.update(cx, |diff_state, cx| {
1289                                diff_state.buffer_language_changed(buffer, cx);
1290                            });
1291                        }
1292                    }
1293                })
1294                .detach();
1295            }
1296            BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => {
1297                if let Some(diffs) = self.shared_diffs.get_mut(peer_id) {
1298                    diffs.remove(buffer_id);
1299                }
1300            }
1301            BufferStoreEvent::BufferDropped(buffer_id) => {
1302                self.diffs.remove(&buffer_id);
1303                for diffs in self.shared_diffs.values_mut() {
1304                    diffs.remove(buffer_id);
1305                }
1306            }
1307
1308            _ => {}
1309        }
1310    }
1311
1312    pub fn recalculate_buffer_diffs(
1313        &mut self,
1314        buffers: Vec<Entity<Buffer>>,
1315        cx: &mut Context<Self>,
1316    ) -> impl Future<Output = ()> + use<> {
1317        let mut futures = Vec::new();
1318        for buffer in buffers {
1319            if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
1320                let buffer = buffer.read(cx).text_snapshot();
1321                diff_state.update(cx, |diff_state, cx| {
1322                    diff_state.recalculate_diffs(buffer.clone(), cx);
1323                    futures.extend(diff_state.wait_for_recalculation().map(FutureExt::boxed));
1324                });
1325                futures.push(diff_state.update(cx, |diff_state, cx| {
1326                    diff_state
1327                        .reparse_conflict_markers(buffer, cx)
1328                        .map(|_| {})
1329                        .boxed()
1330                }));
1331            }
1332        }
1333        async move {
1334            futures::future::join_all(futures).await;
1335        }
1336    }
1337
1338    fn on_buffer_diff_event(
1339        &mut self,
1340        diff: Entity<buffer_diff::BufferDiff>,
1341        event: &BufferDiffEvent,
1342        cx: &mut Context<Self>,
1343    ) {
1344        if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
1345            let buffer_id = diff.read(cx).buffer_id;
1346            if let Some(diff_state) = self.diffs.get(&buffer_id) {
1347                let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
1348                    diff_state.hunk_staging_operation_count += 1;
1349                    diff_state.hunk_staging_operation_count
1350                });
1351                if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
1352                    let recv = repo.update(cx, |repo, cx| {
1353                        log::debug!("hunks changed for {}", path.display());
1354                        repo.spawn_set_index_text_job(
1355                            path,
1356                            new_index_text.as_ref().map(|rope| rope.to_string()),
1357                            Some(hunk_staging_operation_count),
1358                            cx,
1359                        )
1360                    });
1361                    let diff = diff.downgrade();
1362                    cx.spawn(async move |this, cx| {
1363                        if let Ok(Err(error)) = cx.background_spawn(recv).await {
1364                            diff.update(cx, |diff, cx| {
1365                                diff.clear_pending_hunks(cx);
1366                            })
1367                            .ok();
1368                            this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
1369                                .ok();
1370                        }
1371                    })
1372                    .detach();
1373                }
1374            }
1375        }
1376    }
1377
1378    fn local_worktree_git_repos_changed(
1379        &mut self,
1380        worktree: Entity<Worktree>,
1381        changed_repos: &UpdatedGitRepositoriesSet,
1382        cx: &mut Context<Self>,
1383    ) {
1384        log::debug!("local worktree repos changed");
1385        debug_assert!(worktree.read(cx).is_local());
1386
1387        for repository in self.repositories.values() {
1388            repository.update(cx, |repository, cx| {
1389                let repo_abs_path = &repository.work_directory_abs_path;
1390                if changed_repos.iter().any(|update| {
1391                    update.old_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1392                        || update.new_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1393                }) {
1394                    repository.reload_buffer_diff_bases(cx);
1395                }
1396            });
1397        }
1398    }
1399
1400    pub fn repositories(&self) -> &HashMap<RepositoryId, Entity<Repository>> {
1401        &self.repositories
1402    }
1403
1404    pub fn status_for_buffer_id(&self, buffer_id: BufferId, cx: &App) -> Option<FileStatus> {
1405        let (repo, path) = self.repository_and_path_for_buffer_id(buffer_id, cx)?;
1406        let status = repo.read(cx).snapshot.status_for_path(&path)?;
1407        Some(status.status)
1408    }
1409
1410    pub fn repository_and_path_for_buffer_id(
1411        &self,
1412        buffer_id: BufferId,
1413        cx: &App,
1414    ) -> Option<(Entity<Repository>, RepoPath)> {
1415        let buffer = self.buffer_store.read(cx).get(buffer_id)?;
1416        let project_path = buffer.read(cx).project_path(cx)?;
1417        self.repository_and_path_for_project_path(&project_path, cx)
1418    }
1419
1420    pub fn repository_and_path_for_project_path(
1421        &self,
1422        path: &ProjectPath,
1423        cx: &App,
1424    ) -> Option<(Entity<Repository>, RepoPath)> {
1425        let abs_path = self.worktree_store.read(cx).absolutize(path, cx)?;
1426        self.repositories
1427            .values()
1428            .filter_map(|repo| {
1429                let repo_path = repo.read(cx).abs_path_to_repo_path(&abs_path)?;
1430                Some((repo.clone(), repo_path))
1431            })
1432            .max_by_key(|(repo, _)| repo.read(cx).work_directory_abs_path.clone())
1433    }
1434
1435    pub fn git_init(
1436        &self,
1437        path: Arc<Path>,
1438        fallback_branch_name: String,
1439        cx: &App,
1440    ) -> Task<Result<()>> {
1441        match &self.state {
1442            GitStoreState::Local { fs, .. } => {
1443                let fs = fs.clone();
1444                cx.background_executor()
1445                    .spawn(async move { fs.git_init(&path, fallback_branch_name) })
1446            }
1447            GitStoreState::Ssh {
1448                upstream_client,
1449                upstream_project_id: project_id,
1450                ..
1451            }
1452            | GitStoreState::Remote {
1453                upstream_client,
1454                upstream_project_id: project_id,
1455                ..
1456            } => {
1457                let client = upstream_client.clone();
1458                let project_id = *project_id;
1459                cx.background_executor().spawn(async move {
1460                    client
1461                        .request(proto::GitInit {
1462                            project_id: project_id.0,
1463                            abs_path: path.to_string_lossy().to_string(),
1464                            fallback_branch_name,
1465                        })
1466                        .await?;
1467                    Ok(())
1468                })
1469            }
1470        }
1471    }
1472
1473    async fn handle_update_repository(
1474        this: Entity<Self>,
1475        envelope: TypedEnvelope<proto::UpdateRepository>,
1476        mut cx: AsyncApp,
1477    ) -> Result<()> {
1478        this.update(&mut cx, |this, cx| {
1479            let mut update = envelope.payload;
1480
1481            let id = RepositoryId::from_proto(update.id);
1482            let client = this
1483                .upstream_client()
1484                .context("no upstream client")?
1485                .clone();
1486
1487            let mut is_new = false;
1488            let repo = this.repositories.entry(id).or_insert_with(|| {
1489                is_new = true;
1490                let git_store = cx.weak_entity();
1491                cx.new(|cx| {
1492                    Repository::remote(
1493                        id,
1494                        Path::new(&update.abs_path).into(),
1495                        ProjectId(update.project_id),
1496                        client,
1497                        git_store,
1498                        cx,
1499                    )
1500                })
1501            });
1502            if is_new {
1503                this._subscriptions
1504                    .push(cx.subscribe(&repo, Self::on_repository_event))
1505            }
1506
1507            repo.update(cx, {
1508                let update = update.clone();
1509                |repo, cx| repo.apply_remote_update(update, cx)
1510            })?;
1511
1512            this.active_repo_id.get_or_insert_with(|| {
1513                cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1514                id
1515            });
1516
1517            if let Some((client, project_id)) = this.downstream_client() {
1518                update.project_id = project_id.to_proto();
1519                client.send(update).log_err();
1520            }
1521            Ok(())
1522        })?
1523    }
1524
1525    async fn handle_remove_repository(
1526        this: Entity<Self>,
1527        envelope: TypedEnvelope<proto::RemoveRepository>,
1528        mut cx: AsyncApp,
1529    ) -> Result<()> {
1530        this.update(&mut cx, |this, cx| {
1531            let mut update = envelope.payload;
1532            let id = RepositoryId::from_proto(update.id);
1533            this.repositories.remove(&id);
1534            if let Some((client, project_id)) = this.downstream_client() {
1535                update.project_id = project_id.to_proto();
1536                client.send(update).log_err();
1537            }
1538            if this.active_repo_id == Some(id) {
1539                this.active_repo_id = None;
1540                cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1541            }
1542            cx.emit(GitStoreEvent::RepositoryRemoved(id));
1543        })
1544    }
1545
1546    async fn handle_git_init(
1547        this: Entity<Self>,
1548        envelope: TypedEnvelope<proto::GitInit>,
1549        cx: AsyncApp,
1550    ) -> Result<proto::Ack> {
1551        let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1552        let name = envelope.payload.fallback_branch_name;
1553        cx.update(|cx| this.read(cx).git_init(path, name, cx))?
1554            .await?;
1555
1556        Ok(proto::Ack {})
1557    }
1558
1559    async fn handle_fetch(
1560        this: Entity<Self>,
1561        envelope: TypedEnvelope<proto::Fetch>,
1562        mut cx: AsyncApp,
1563    ) -> Result<proto::RemoteMessageResponse> {
1564        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1565        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1566        let askpass_id = envelope.payload.askpass_id;
1567
1568        let askpass = make_remote_delegate(
1569            this,
1570            envelope.payload.project_id,
1571            repository_id,
1572            askpass_id,
1573            &mut cx,
1574        );
1575
1576        let remote_output = repository_handle
1577            .update(&mut cx, |repository_handle, cx| {
1578                repository_handle.fetch(askpass, cx)
1579            })?
1580            .await??;
1581
1582        Ok(proto::RemoteMessageResponse {
1583            stdout: remote_output.stdout,
1584            stderr: remote_output.stderr,
1585        })
1586    }
1587
1588    async fn handle_push(
1589        this: Entity<Self>,
1590        envelope: TypedEnvelope<proto::Push>,
1591        mut cx: AsyncApp,
1592    ) -> Result<proto::RemoteMessageResponse> {
1593        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1594        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1595
1596        let askpass_id = envelope.payload.askpass_id;
1597        let askpass = make_remote_delegate(
1598            this,
1599            envelope.payload.project_id,
1600            repository_id,
1601            askpass_id,
1602            &mut cx,
1603        );
1604
1605        let options = envelope
1606            .payload
1607            .options
1608            .as_ref()
1609            .map(|_| match envelope.payload.options() {
1610                proto::push::PushOptions::SetUpstream => git::repository::PushOptions::SetUpstream,
1611                proto::push::PushOptions::Force => git::repository::PushOptions::Force,
1612            });
1613
1614        let branch_name = envelope.payload.branch_name.into();
1615        let remote_name = envelope.payload.remote_name.into();
1616
1617        let remote_output = repository_handle
1618            .update(&mut cx, |repository_handle, cx| {
1619                repository_handle.push(branch_name, remote_name, options, askpass, cx)
1620            })?
1621            .await??;
1622        Ok(proto::RemoteMessageResponse {
1623            stdout: remote_output.stdout,
1624            stderr: remote_output.stderr,
1625        })
1626    }
1627
1628    async fn handle_pull(
1629        this: Entity<Self>,
1630        envelope: TypedEnvelope<proto::Pull>,
1631        mut cx: AsyncApp,
1632    ) -> Result<proto::RemoteMessageResponse> {
1633        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1634        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1635        let askpass_id = envelope.payload.askpass_id;
1636        let askpass = make_remote_delegate(
1637            this,
1638            envelope.payload.project_id,
1639            repository_id,
1640            askpass_id,
1641            &mut cx,
1642        );
1643
1644        let branch_name = envelope.payload.branch_name.into();
1645        let remote_name = envelope.payload.remote_name.into();
1646
1647        let remote_message = repository_handle
1648            .update(&mut cx, |repository_handle, cx| {
1649                repository_handle.pull(branch_name, remote_name, askpass, cx)
1650            })?
1651            .await??;
1652
1653        Ok(proto::RemoteMessageResponse {
1654            stdout: remote_message.stdout,
1655            stderr: remote_message.stderr,
1656        })
1657    }
1658
1659    async fn handle_stage(
1660        this: Entity<Self>,
1661        envelope: TypedEnvelope<proto::Stage>,
1662        mut cx: AsyncApp,
1663    ) -> Result<proto::Ack> {
1664        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1665        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1666
1667        let entries = envelope
1668            .payload
1669            .paths
1670            .into_iter()
1671            .map(PathBuf::from)
1672            .map(RepoPath::new)
1673            .collect();
1674
1675        repository_handle
1676            .update(&mut cx, |repository_handle, cx| {
1677                repository_handle.stage_entries(entries, cx)
1678            })?
1679            .await?;
1680        Ok(proto::Ack {})
1681    }
1682
1683    async fn handle_unstage(
1684        this: Entity<Self>,
1685        envelope: TypedEnvelope<proto::Unstage>,
1686        mut cx: AsyncApp,
1687    ) -> Result<proto::Ack> {
1688        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1689        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1690
1691        let entries = envelope
1692            .payload
1693            .paths
1694            .into_iter()
1695            .map(PathBuf::from)
1696            .map(RepoPath::new)
1697            .collect();
1698
1699        repository_handle
1700            .update(&mut cx, |repository_handle, cx| {
1701                repository_handle.unstage_entries(entries, cx)
1702            })?
1703            .await?;
1704
1705        Ok(proto::Ack {})
1706    }
1707
1708    async fn handle_set_index_text(
1709        this: Entity<Self>,
1710        envelope: TypedEnvelope<proto::SetIndexText>,
1711        mut cx: AsyncApp,
1712    ) -> Result<proto::Ack> {
1713        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1714        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1715        let repo_path = RepoPath::from_str(&envelope.payload.path);
1716
1717        repository_handle
1718            .update(&mut cx, |repository_handle, cx| {
1719                repository_handle.spawn_set_index_text_job(
1720                    repo_path,
1721                    envelope.payload.text,
1722                    None,
1723                    cx,
1724                )
1725            })?
1726            .await??;
1727        Ok(proto::Ack {})
1728    }
1729
1730    async fn handle_commit(
1731        this: Entity<Self>,
1732        envelope: TypedEnvelope<proto::Commit>,
1733        mut cx: AsyncApp,
1734    ) -> Result<proto::Ack> {
1735        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1736        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1737
1738        let message = SharedString::from(envelope.payload.message);
1739        let name = envelope.payload.name.map(SharedString::from);
1740        let email = envelope.payload.email.map(SharedString::from);
1741        let options = envelope.payload.options.unwrap_or_default();
1742
1743        repository_handle
1744            .update(&mut cx, |repository_handle, cx| {
1745                repository_handle.commit(
1746                    message,
1747                    name.zip(email),
1748                    CommitOptions {
1749                        amend: options.amend,
1750                    },
1751                    cx,
1752                )
1753            })?
1754            .await??;
1755        Ok(proto::Ack {})
1756    }
1757
1758    async fn handle_get_remotes(
1759        this: Entity<Self>,
1760        envelope: TypedEnvelope<proto::GetRemotes>,
1761        mut cx: AsyncApp,
1762    ) -> Result<proto::GetRemotesResponse> {
1763        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1764        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1765
1766        let branch_name = envelope.payload.branch_name;
1767
1768        let remotes = repository_handle
1769            .update(&mut cx, |repository_handle, _| {
1770                repository_handle.get_remotes(branch_name)
1771            })?
1772            .await??;
1773
1774        Ok(proto::GetRemotesResponse {
1775            remotes: remotes
1776                .into_iter()
1777                .map(|remotes| proto::get_remotes_response::Remote {
1778                    name: remotes.name.to_string(),
1779                })
1780                .collect::<Vec<_>>(),
1781        })
1782    }
1783
1784    async fn handle_get_branches(
1785        this: Entity<Self>,
1786        envelope: TypedEnvelope<proto::GitGetBranches>,
1787        mut cx: AsyncApp,
1788    ) -> Result<proto::GitBranchesResponse> {
1789        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1790        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1791
1792        let branches = repository_handle
1793            .update(&mut cx, |repository_handle, _| repository_handle.branches())?
1794            .await??;
1795
1796        Ok(proto::GitBranchesResponse {
1797            branches: branches
1798                .into_iter()
1799                .map(|branch| branch_to_proto(&branch))
1800                .collect::<Vec<_>>(),
1801        })
1802    }
1803    async fn handle_create_branch(
1804        this: Entity<Self>,
1805        envelope: TypedEnvelope<proto::GitCreateBranch>,
1806        mut cx: AsyncApp,
1807    ) -> Result<proto::Ack> {
1808        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1809        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1810        let branch_name = envelope.payload.branch_name;
1811
1812        repository_handle
1813            .update(&mut cx, |repository_handle, _| {
1814                repository_handle.create_branch(branch_name)
1815            })?
1816            .await??;
1817
1818        Ok(proto::Ack {})
1819    }
1820
1821    async fn handle_change_branch(
1822        this: Entity<Self>,
1823        envelope: TypedEnvelope<proto::GitChangeBranch>,
1824        mut cx: AsyncApp,
1825    ) -> Result<proto::Ack> {
1826        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1827        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1828        let branch_name = envelope.payload.branch_name;
1829
1830        repository_handle
1831            .update(&mut cx, |repository_handle, _| {
1832                repository_handle.change_branch(branch_name)
1833            })?
1834            .await??;
1835
1836        Ok(proto::Ack {})
1837    }
1838
1839    async fn handle_show(
1840        this: Entity<Self>,
1841        envelope: TypedEnvelope<proto::GitShow>,
1842        mut cx: AsyncApp,
1843    ) -> Result<proto::GitCommitDetails> {
1844        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1845        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1846
1847        let commit = repository_handle
1848            .update(&mut cx, |repository_handle, _| {
1849                repository_handle.show(envelope.payload.commit)
1850            })?
1851            .await??;
1852        Ok(proto::GitCommitDetails {
1853            sha: commit.sha.into(),
1854            message: commit.message.into(),
1855            commit_timestamp: commit.commit_timestamp,
1856            author_email: commit.author_email.into(),
1857            author_name: commit.author_name.into(),
1858        })
1859    }
1860
1861    async fn handle_load_commit_diff(
1862        this: Entity<Self>,
1863        envelope: TypedEnvelope<proto::LoadCommitDiff>,
1864        mut cx: AsyncApp,
1865    ) -> Result<proto::LoadCommitDiffResponse> {
1866        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1867        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1868
1869        let commit_diff = repository_handle
1870            .update(&mut cx, |repository_handle, _| {
1871                repository_handle.load_commit_diff(envelope.payload.commit)
1872            })?
1873            .await??;
1874        Ok(proto::LoadCommitDiffResponse {
1875            files: commit_diff
1876                .files
1877                .into_iter()
1878                .map(|file| proto::CommitFile {
1879                    path: file.path.to_string(),
1880                    old_text: file.old_text,
1881                    new_text: file.new_text,
1882                })
1883                .collect(),
1884        })
1885    }
1886
1887    async fn handle_reset(
1888        this: Entity<Self>,
1889        envelope: TypedEnvelope<proto::GitReset>,
1890        mut cx: AsyncApp,
1891    ) -> Result<proto::Ack> {
1892        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1893        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1894
1895        let mode = match envelope.payload.mode() {
1896            git_reset::ResetMode::Soft => ResetMode::Soft,
1897            git_reset::ResetMode::Mixed => ResetMode::Mixed,
1898        };
1899
1900        repository_handle
1901            .update(&mut cx, |repository_handle, cx| {
1902                repository_handle.reset(envelope.payload.commit, mode, cx)
1903            })?
1904            .await??;
1905        Ok(proto::Ack {})
1906    }
1907
1908    async fn handle_checkout_files(
1909        this: Entity<Self>,
1910        envelope: TypedEnvelope<proto::GitCheckoutFiles>,
1911        mut cx: AsyncApp,
1912    ) -> Result<proto::Ack> {
1913        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1914        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1915        let paths = envelope
1916            .payload
1917            .paths
1918            .iter()
1919            .map(|s| RepoPath::from_str(s))
1920            .collect();
1921
1922        repository_handle
1923            .update(&mut cx, |repository_handle, cx| {
1924                repository_handle.checkout_files(&envelope.payload.commit, paths, cx)
1925            })?
1926            .await??;
1927        Ok(proto::Ack {})
1928    }
1929
1930    async fn handle_open_commit_message_buffer(
1931        this: Entity<Self>,
1932        envelope: TypedEnvelope<proto::OpenCommitMessageBuffer>,
1933        mut cx: AsyncApp,
1934    ) -> Result<proto::OpenBufferResponse> {
1935        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1936        let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1937        let buffer = repository
1938            .update(&mut cx, |repository, cx| {
1939                repository.open_commit_buffer(None, this.read(cx).buffer_store.clone(), cx)
1940            })?
1941            .await?;
1942
1943        let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id())?;
1944        this.update(&mut cx, |this, cx| {
1945            this.buffer_store.update(cx, |buffer_store, cx| {
1946                buffer_store
1947                    .create_buffer_for_peer(
1948                        &buffer,
1949                        envelope.original_sender_id.unwrap_or(envelope.sender_id),
1950                        cx,
1951                    )
1952                    .detach_and_log_err(cx);
1953            })
1954        })?;
1955
1956        Ok(proto::OpenBufferResponse {
1957            buffer_id: buffer_id.to_proto(),
1958        })
1959    }
1960
1961    async fn handle_askpass(
1962        this: Entity<Self>,
1963        envelope: TypedEnvelope<proto::AskPassRequest>,
1964        mut cx: AsyncApp,
1965    ) -> Result<proto::AskPassResponse> {
1966        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1967        let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1968
1969        let delegates = cx.update(|cx| repository.read(cx).askpass_delegates.clone())?;
1970        let Some(mut askpass) = delegates.lock().remove(&envelope.payload.askpass_id) else {
1971            debug_panic!("no askpass found");
1972            return Err(anyhow::anyhow!("no askpass found"));
1973        };
1974
1975        let response = askpass.ask_password(envelope.payload.prompt).await?;
1976
1977        delegates
1978            .lock()
1979            .insert(envelope.payload.askpass_id, askpass);
1980
1981        Ok(proto::AskPassResponse { response })
1982    }
1983
1984    async fn handle_check_for_pushed_commits(
1985        this: Entity<Self>,
1986        envelope: TypedEnvelope<proto::CheckForPushedCommits>,
1987        mut cx: AsyncApp,
1988    ) -> Result<proto::CheckForPushedCommitsResponse> {
1989        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1990        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1991
1992        let branches = repository_handle
1993            .update(&mut cx, |repository_handle, _| {
1994                repository_handle.check_for_pushed_commits()
1995            })?
1996            .await??;
1997        Ok(proto::CheckForPushedCommitsResponse {
1998            pushed_to: branches
1999                .into_iter()
2000                .map(|commit| commit.to_string())
2001                .collect(),
2002        })
2003    }
2004
2005    async fn handle_git_diff(
2006        this: Entity<Self>,
2007        envelope: TypedEnvelope<proto::GitDiff>,
2008        mut cx: AsyncApp,
2009    ) -> Result<proto::GitDiffResponse> {
2010        let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2011        let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2012        let diff_type = match envelope.payload.diff_type() {
2013            proto::git_diff::DiffType::HeadToIndex => DiffType::HeadToIndex,
2014            proto::git_diff::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
2015        };
2016
2017        let mut diff = repository_handle
2018            .update(&mut cx, |repository_handle, cx| {
2019                repository_handle.diff(diff_type, cx)
2020            })?
2021            .await??;
2022        const ONE_MB: usize = 1_000_000;
2023        if diff.len() > ONE_MB {
2024            diff = diff.chars().take(ONE_MB).collect()
2025        }
2026
2027        Ok(proto::GitDiffResponse { diff })
2028    }
2029
2030    async fn handle_open_unstaged_diff(
2031        this: Entity<Self>,
2032        request: TypedEnvelope<proto::OpenUnstagedDiff>,
2033        mut cx: AsyncApp,
2034    ) -> Result<proto::OpenUnstagedDiffResponse> {
2035        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2036        let diff = this
2037            .update(&mut cx, |this, cx| {
2038                let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2039                Some(this.open_unstaged_diff(buffer, cx))
2040            })?
2041            .ok_or_else(|| anyhow!("no such buffer"))?
2042            .await?;
2043        this.update(&mut cx, |this, _| {
2044            let shared_diffs = this
2045                .shared_diffs
2046                .entry(request.original_sender_id.unwrap_or(request.sender_id))
2047                .or_default();
2048            shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone());
2049        })?;
2050        let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?;
2051        Ok(proto::OpenUnstagedDiffResponse { staged_text })
2052    }
2053
2054    async fn handle_open_uncommitted_diff(
2055        this: Entity<Self>,
2056        request: TypedEnvelope<proto::OpenUncommittedDiff>,
2057        mut cx: AsyncApp,
2058    ) -> Result<proto::OpenUncommittedDiffResponse> {
2059        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2060        let diff = this
2061            .update(&mut cx, |this, cx| {
2062                let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2063                Some(this.open_uncommitted_diff(buffer, cx))
2064            })?
2065            .ok_or_else(|| anyhow!("no such buffer"))?
2066            .await?;
2067        this.update(&mut cx, |this, _| {
2068            let shared_diffs = this
2069                .shared_diffs
2070                .entry(request.original_sender_id.unwrap_or(request.sender_id))
2071                .or_default();
2072            shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone());
2073        })?;
2074        diff.read_with(&cx, |diff, cx| {
2075            use proto::open_uncommitted_diff_response::Mode;
2076
2077            let unstaged_diff = diff.secondary_diff();
2078            let index_snapshot = unstaged_diff.and_then(|diff| {
2079                let diff = diff.read(cx);
2080                diff.base_text_exists().then(|| diff.base_text())
2081            });
2082
2083            let mode;
2084            let staged_text;
2085            let committed_text;
2086            if diff.base_text_exists() {
2087                let committed_snapshot = diff.base_text();
2088                committed_text = Some(committed_snapshot.text());
2089                if let Some(index_text) = index_snapshot {
2090                    if index_text.remote_id() == committed_snapshot.remote_id() {
2091                        mode = Mode::IndexMatchesHead;
2092                        staged_text = None;
2093                    } else {
2094                        mode = Mode::IndexAndHead;
2095                        staged_text = Some(index_text.text());
2096                    }
2097                } else {
2098                    mode = Mode::IndexAndHead;
2099                    staged_text = None;
2100                }
2101            } else {
2102                mode = Mode::IndexAndHead;
2103                committed_text = None;
2104                staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2105            }
2106
2107            proto::OpenUncommittedDiffResponse {
2108                committed_text,
2109                staged_text,
2110                mode: mode.into(),
2111            }
2112        })
2113    }
2114
2115    async fn handle_update_diff_bases(
2116        this: Entity<Self>,
2117        request: TypedEnvelope<proto::UpdateDiffBases>,
2118        mut cx: AsyncApp,
2119    ) -> Result<()> {
2120        let buffer_id = BufferId::new(request.payload.buffer_id)?;
2121        this.update(&mut cx, |this, cx| {
2122            if let Some(diff_state) = this.diffs.get_mut(&buffer_id) {
2123                if let Some(buffer) = this.buffer_store.read(cx).get(buffer_id) {
2124                    let buffer = buffer.read(cx).text_snapshot();
2125                    diff_state.update(cx, |diff_state, cx| {
2126                        diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2127                    })
2128                }
2129            }
2130        })
2131    }
2132
2133    async fn handle_blame_buffer(
2134        this: Entity<Self>,
2135        envelope: TypedEnvelope<proto::BlameBuffer>,
2136        mut cx: AsyncApp,
2137    ) -> Result<proto::BlameBufferResponse> {
2138        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2139        let version = deserialize_version(&envelope.payload.version);
2140        let buffer = this.read_with(&cx, |this, cx| {
2141            this.buffer_store.read(cx).get_existing(buffer_id)
2142        })??;
2143        buffer
2144            .update(&mut cx, |buffer, _| {
2145                buffer.wait_for_version(version.clone())
2146            })?
2147            .await?;
2148        let blame = this
2149            .update(&mut cx, |this, cx| {
2150                this.blame_buffer(&buffer, Some(version), cx)
2151            })?
2152            .await?;
2153        Ok(serialize_blame_buffer_response(blame))
2154    }
2155
2156    async fn handle_get_permalink_to_line(
2157        this: Entity<Self>,
2158        envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2159        mut cx: AsyncApp,
2160    ) -> Result<proto::GetPermalinkToLineResponse> {
2161        let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2162        // let version = deserialize_version(&envelope.payload.version);
2163        let selection = {
2164            let proto_selection = envelope
2165                .payload
2166                .selection
2167                .context("no selection to get permalink for defined")?;
2168            proto_selection.start as u32..proto_selection.end as u32
2169        };
2170        let buffer = this.read_with(&cx, |this, cx| {
2171            this.buffer_store.read(cx).get_existing(buffer_id)
2172        })??;
2173        let permalink = this
2174            .update(&mut cx, |this, cx| {
2175                this.get_permalink_to_line(&buffer, selection, cx)
2176            })?
2177            .await?;
2178        Ok(proto::GetPermalinkToLineResponse {
2179            permalink: permalink.to_string(),
2180        })
2181    }
2182
2183    fn repository_for_request(
2184        this: &Entity<Self>,
2185        id: RepositoryId,
2186        cx: &mut AsyncApp,
2187    ) -> Result<Entity<Repository>> {
2188        this.update(cx, |this, _| {
2189            this.repositories
2190                .get(&id)
2191                .context("missing repository handle")
2192                .cloned()
2193        })?
2194    }
2195
2196    pub fn repo_snapshots(&self, cx: &App) -> HashMap<RepositoryId, RepositorySnapshot> {
2197        self.repositories
2198            .iter()
2199            .map(|(id, repo)| (*id, repo.read(cx).snapshot.clone()))
2200            .collect()
2201    }
2202}
2203
2204impl BufferGitState {
2205    fn new(_git_store: WeakEntity<GitStore>) -> Self {
2206        Self {
2207            unstaged_diff: Default::default(),
2208            uncommitted_diff: Default::default(),
2209            recalculate_diff_task: Default::default(),
2210            language: Default::default(),
2211            language_registry: Default::default(),
2212            recalculating_tx: postage::watch::channel_with(false).0,
2213            hunk_staging_operation_count: 0,
2214            hunk_staging_operation_count_as_of_write: 0,
2215            head_text: Default::default(),
2216            index_text: Default::default(),
2217            head_changed: Default::default(),
2218            index_changed: Default::default(),
2219            language_changed: Default::default(),
2220            conflict_updated_futures: Default::default(),
2221            conflict_set: Default::default(),
2222            reparse_conflict_markers_task: Default::default(),
2223        }
2224    }
2225
2226    fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
2227        self.language = buffer.read(cx).language().cloned();
2228        self.language_changed = true;
2229        let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
2230    }
2231
2232    fn reparse_conflict_markers(
2233        &mut self,
2234        buffer: text::BufferSnapshot,
2235        cx: &mut Context<Self>,
2236    ) -> oneshot::Receiver<()> {
2237        let (tx, rx) = oneshot::channel();
2238
2239        let Some(conflict_set) = self
2240            .conflict_set
2241            .as_ref()
2242            .and_then(|conflict_set| conflict_set.upgrade())
2243        else {
2244            return rx;
2245        };
2246
2247        let old_snapshot = conflict_set.read_with(cx, |conflict_set, _| {
2248            if conflict_set.has_conflict {
2249                Some(conflict_set.snapshot())
2250            } else {
2251                None
2252            }
2253        });
2254
2255        if let Some(old_snapshot) = old_snapshot {
2256            self.conflict_updated_futures.push(tx);
2257            self.reparse_conflict_markers_task = Some(cx.spawn(async move |this, cx| {
2258                let (snapshot, changed_range) = cx
2259                    .background_spawn(async move {
2260                        let new_snapshot = ConflictSet::parse(&buffer);
2261                        let changed_range = old_snapshot.compare(&new_snapshot, &buffer);
2262                        (new_snapshot, changed_range)
2263                    })
2264                    .await;
2265                this.update(cx, |this, cx| {
2266                    if let Some(conflict_set) = &this.conflict_set {
2267                        conflict_set
2268                            .update(cx, |conflict_set, cx| {
2269                                conflict_set.set_snapshot(snapshot, changed_range, cx);
2270                            })
2271                            .ok();
2272                    }
2273                    let futures = std::mem::take(&mut this.conflict_updated_futures);
2274                    for tx in futures {
2275                        tx.send(()).ok();
2276                    }
2277                })
2278            }))
2279        }
2280
2281        rx
2282    }
2283
2284    fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
2285        self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
2286    }
2287
2288    fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
2289        self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
2290    }
2291
2292    fn handle_base_texts_updated(
2293        &mut self,
2294        buffer: text::BufferSnapshot,
2295        message: proto::UpdateDiffBases,
2296        cx: &mut Context<Self>,
2297    ) {
2298        use proto::update_diff_bases::Mode;
2299
2300        let Some(mode) = Mode::from_i32(message.mode) else {
2301            return;
2302        };
2303
2304        let diff_bases_change = match mode {
2305            Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
2306            Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
2307            Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
2308            Mode::IndexAndHead => DiffBasesChange::SetEach {
2309                index: message.staged_text,
2310                head: message.committed_text,
2311            },
2312        };
2313
2314        self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
2315    }
2316
2317    pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
2318        if *self.recalculating_tx.borrow() {
2319            let mut rx = self.recalculating_tx.subscribe();
2320            return Some(async move {
2321                loop {
2322                    let is_recalculating = rx.recv().await;
2323                    if is_recalculating != Some(true) {
2324                        break;
2325                    }
2326                }
2327            });
2328        } else {
2329            None
2330        }
2331    }
2332
2333    fn diff_bases_changed(
2334        &mut self,
2335        buffer: text::BufferSnapshot,
2336        diff_bases_change: Option<DiffBasesChange>,
2337        cx: &mut Context<Self>,
2338    ) {
2339        match diff_bases_change {
2340            Some(DiffBasesChange::SetIndex(index)) => {
2341                self.index_text = index.map(|mut index| {
2342                    text::LineEnding::normalize(&mut index);
2343                    Arc::new(index)
2344                });
2345                self.index_changed = true;
2346            }
2347            Some(DiffBasesChange::SetHead(head)) => {
2348                self.head_text = head.map(|mut head| {
2349                    text::LineEnding::normalize(&mut head);
2350                    Arc::new(head)
2351                });
2352                self.head_changed = true;
2353            }
2354            Some(DiffBasesChange::SetBoth(text)) => {
2355                let text = text.map(|mut text| {
2356                    text::LineEnding::normalize(&mut text);
2357                    Arc::new(text)
2358                });
2359                self.head_text = text.clone();
2360                self.index_text = text;
2361                self.head_changed = true;
2362                self.index_changed = true;
2363            }
2364            Some(DiffBasesChange::SetEach { index, head }) => {
2365                self.index_text = index.map(|mut index| {
2366                    text::LineEnding::normalize(&mut index);
2367                    Arc::new(index)
2368                });
2369                self.index_changed = true;
2370                self.head_text = head.map(|mut head| {
2371                    text::LineEnding::normalize(&mut head);
2372                    Arc::new(head)
2373                });
2374                self.head_changed = true;
2375            }
2376            None => {}
2377        }
2378
2379        self.recalculate_diffs(buffer, cx)
2380    }
2381
2382    fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
2383        *self.recalculating_tx.borrow_mut() = true;
2384
2385        let language = self.language.clone();
2386        let language_registry = self.language_registry.clone();
2387        let unstaged_diff = self.unstaged_diff();
2388        let uncommitted_diff = self.uncommitted_diff();
2389        let head = self.head_text.clone();
2390        let index = self.index_text.clone();
2391        let index_changed = self.index_changed;
2392        let head_changed = self.head_changed;
2393        let language_changed = self.language_changed;
2394        let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_write;
2395        let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
2396            (Some(index), Some(head)) => Arc::ptr_eq(index, head),
2397            (None, None) => true,
2398            _ => false,
2399        };
2400        self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
2401            log::debug!(
2402                "start recalculating diffs for buffer {}",
2403                buffer.remote_id()
2404            );
2405
2406            let mut new_unstaged_diff = None;
2407            if let Some(unstaged_diff) = &unstaged_diff {
2408                new_unstaged_diff = Some(
2409                    BufferDiff::update_diff(
2410                        unstaged_diff.clone(),
2411                        buffer.clone(),
2412                        index,
2413                        index_changed,
2414                        language_changed,
2415                        language.clone(),
2416                        language_registry.clone(),
2417                        cx,
2418                    )
2419                    .await?,
2420                );
2421            }
2422
2423            let mut new_uncommitted_diff = None;
2424            if let Some(uncommitted_diff) = &uncommitted_diff {
2425                new_uncommitted_diff = if index_matches_head {
2426                    new_unstaged_diff.clone()
2427                } else {
2428                    Some(
2429                        BufferDiff::update_diff(
2430                            uncommitted_diff.clone(),
2431                            buffer.clone(),
2432                            head,
2433                            head_changed,
2434                            language_changed,
2435                            language.clone(),
2436                            language_registry.clone(),
2437                            cx,
2438                        )
2439                        .await?,
2440                    )
2441                }
2442            }
2443
2444            let cancel = this.update(cx, |this, _| {
2445                // This checks whether all pending stage/unstage operations
2446                // have quiesced (i.e. both the corresponding write and the
2447                // read of that write have completed). If not, then we cancel
2448                // this recalculation attempt to avoid invalidating pending
2449                // state too quickly; another recalculation will come along
2450                // later and clear the pending state once the state of the index has settled.
2451                if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
2452                    *this.recalculating_tx.borrow_mut() = false;
2453                    true
2454                } else {
2455                    false
2456                }
2457            })?;
2458            if cancel {
2459                log::debug!(
2460                    concat!(
2461                        "aborting recalculating diffs for buffer {}",
2462                        "due to subsequent hunk operations",
2463                    ),
2464                    buffer.remote_id()
2465                );
2466                return Ok(());
2467            }
2468
2469            let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
2470                unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
2471            {
2472                unstaged_diff.update(cx, |diff, cx| {
2473                    if language_changed {
2474                        diff.language_changed(cx);
2475                    }
2476                    diff.set_snapshot(new_unstaged_diff, &buffer, cx)
2477                })?
2478            } else {
2479                None
2480            };
2481
2482            if let Some((uncommitted_diff, new_uncommitted_diff)) =
2483                uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
2484            {
2485                uncommitted_diff.update(cx, |diff, cx| {
2486                    if language_changed {
2487                        diff.language_changed(cx);
2488                    }
2489                    diff.set_snapshot_with_secondary(
2490                        new_uncommitted_diff,
2491                        &buffer,
2492                        unstaged_changed_range,
2493                        true,
2494                        cx,
2495                    );
2496                })?;
2497            }
2498
2499            log::debug!(
2500                "finished recalculating diffs for buffer {}",
2501                buffer.remote_id()
2502            );
2503
2504            if let Some(this) = this.upgrade() {
2505                this.update(cx, |this, _| {
2506                    this.index_changed = false;
2507                    this.head_changed = false;
2508                    this.language_changed = false;
2509                    *this.recalculating_tx.borrow_mut() = false;
2510                })?;
2511            }
2512
2513            Ok(())
2514        }));
2515    }
2516}
2517
2518fn make_remote_delegate(
2519    this: Entity<GitStore>,
2520    project_id: u64,
2521    repository_id: RepositoryId,
2522    askpass_id: u64,
2523    cx: &mut AsyncApp,
2524) -> AskPassDelegate {
2525    AskPassDelegate::new(cx, move |prompt, tx, cx| {
2526        this.update(cx, |this, cx| {
2527            let Some((client, _)) = this.downstream_client() else {
2528                return;
2529            };
2530            let response = client.request(proto::AskPassRequest {
2531                project_id,
2532                repository_id: repository_id.to_proto(),
2533                askpass_id,
2534                prompt,
2535            });
2536            cx.spawn(async move |_, _| {
2537                tx.send(response.await?.response).ok();
2538                anyhow::Ok(())
2539            })
2540            .detach_and_log_err(cx);
2541        })
2542        .log_err();
2543    })
2544}
2545
2546impl RepositoryId {
2547    pub fn to_proto(self) -> u64 {
2548        self.0
2549    }
2550
2551    pub fn from_proto(id: u64) -> Self {
2552        RepositoryId(id)
2553    }
2554}
2555
2556impl RepositorySnapshot {
2557    fn empty(id: RepositoryId, work_directory_abs_path: Arc<Path>) -> Self {
2558        Self {
2559            id,
2560            statuses_by_path: Default::default(),
2561            work_directory_abs_path,
2562            branch: None,
2563            head_commit: None,
2564            scan_id: 0,
2565            merge: Default::default(),
2566        }
2567    }
2568
2569    fn initial_update(&self, project_id: u64) -> proto::UpdateRepository {
2570        proto::UpdateRepository {
2571            branch_summary: self.branch.as_ref().map(branch_to_proto),
2572            head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2573            updated_statuses: self
2574                .statuses_by_path
2575                .iter()
2576                .map(|entry| entry.to_proto())
2577                .collect(),
2578            removed_statuses: Default::default(),
2579            current_merge_conflicts: self
2580                .merge
2581                .conflicted_paths
2582                .iter()
2583                .map(|repo_path| repo_path.to_proto())
2584                .collect(),
2585            project_id,
2586            id: self.id.to_proto(),
2587            abs_path: self.work_directory_abs_path.to_proto(),
2588            entry_ids: vec![self.id.to_proto()],
2589            scan_id: self.scan_id,
2590            is_last_update: true,
2591        }
2592    }
2593
2594    fn build_update(&self, old: &Self, project_id: u64) -> proto::UpdateRepository {
2595        let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
2596        let mut removed_statuses: Vec<String> = Vec::new();
2597
2598        let mut new_statuses = self.statuses_by_path.iter().peekable();
2599        let mut old_statuses = old.statuses_by_path.iter().peekable();
2600
2601        let mut current_new_entry = new_statuses.next();
2602        let mut current_old_entry = old_statuses.next();
2603        loop {
2604            match (current_new_entry, current_old_entry) {
2605                (Some(new_entry), Some(old_entry)) => {
2606                    match new_entry.repo_path.cmp(&old_entry.repo_path) {
2607                        Ordering::Less => {
2608                            updated_statuses.push(new_entry.to_proto());
2609                            current_new_entry = new_statuses.next();
2610                        }
2611                        Ordering::Equal => {
2612                            if new_entry.status != old_entry.status {
2613                                updated_statuses.push(new_entry.to_proto());
2614                            }
2615                            current_old_entry = old_statuses.next();
2616                            current_new_entry = new_statuses.next();
2617                        }
2618                        Ordering::Greater => {
2619                            removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2620                            current_old_entry = old_statuses.next();
2621                        }
2622                    }
2623                }
2624                (None, Some(old_entry)) => {
2625                    removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2626                    current_old_entry = old_statuses.next();
2627                }
2628                (Some(new_entry), None) => {
2629                    updated_statuses.push(new_entry.to_proto());
2630                    current_new_entry = new_statuses.next();
2631                }
2632                (None, None) => break,
2633            }
2634        }
2635
2636        proto::UpdateRepository {
2637            branch_summary: self.branch.as_ref().map(branch_to_proto),
2638            head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2639            updated_statuses,
2640            removed_statuses,
2641            current_merge_conflicts: self
2642                .merge
2643                .conflicted_paths
2644                .iter()
2645                .map(|path| path.as_ref().to_proto())
2646                .collect(),
2647            project_id,
2648            id: self.id.to_proto(),
2649            abs_path: self.work_directory_abs_path.to_proto(),
2650            entry_ids: vec![],
2651            scan_id: self.scan_id,
2652            is_last_update: true,
2653        }
2654    }
2655
2656    pub fn status(&self) -> impl Iterator<Item = StatusEntry> + '_ {
2657        self.statuses_by_path.iter().cloned()
2658    }
2659
2660    pub fn status_summary(&self) -> GitSummary {
2661        self.statuses_by_path.summary().item_summary
2662    }
2663
2664    pub fn status_for_path(&self, path: &RepoPath) -> Option<StatusEntry> {
2665        self.statuses_by_path
2666            .get(&PathKey(path.0.clone()), &())
2667            .cloned()
2668    }
2669
2670    pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
2671        abs_path
2672            .strip_prefix(&self.work_directory_abs_path)
2673            .map(RepoPath::from)
2674            .ok()
2675    }
2676
2677    pub fn has_conflict(&self, repo_path: &RepoPath) -> bool {
2678        self.merge.conflicted_paths.contains(repo_path)
2679    }
2680
2681    /// This is the name that will be displayed in the repository selector for this repository.
2682    pub fn display_name(&self) -> SharedString {
2683        self.work_directory_abs_path
2684            .file_name()
2685            .unwrap_or_default()
2686            .to_string_lossy()
2687            .to_string()
2688            .into()
2689    }
2690}
2691
2692impl MergeDetails {
2693    async fn load(
2694        backend: &Arc<dyn GitRepository>,
2695        status: &SumTree<StatusEntry>,
2696        prev_snapshot: &RepositorySnapshot,
2697    ) -> Result<(MergeDetails, bool)> {
2698        fn sha_eq<'a>(
2699            l: impl IntoIterator<Item = &'a CommitDetails>,
2700            r: impl IntoIterator<Item = &'a CommitDetails>,
2701        ) -> bool {
2702            l.into_iter()
2703                .map(|commit| &commit.sha)
2704                .eq(r.into_iter().map(|commit| &commit.sha))
2705        }
2706
2707        let merge_heads = try_join_all(
2708            backend
2709                .merge_head_shas()
2710                .into_iter()
2711                .map(|sha| backend.show(sha)),
2712        )
2713        .await?;
2714        let cherry_pick_head = backend.show("CHERRY_PICK_HEAD".into()).await.ok();
2715        let rebase_head = backend.show("REBASE_HEAD".into()).await.ok();
2716        let revert_head = backend.show("REVERT_HEAD".into()).await.ok();
2717        let apply_head = backend.show("APPLY_HEAD".into()).await.ok();
2718        let message = backend.merge_message().await.map(SharedString::from);
2719        let merge_heads_changed = !sha_eq(
2720            merge_heads.as_slice(),
2721            prev_snapshot.merge.merge_heads.as_slice(),
2722        ) || !sha_eq(
2723            cherry_pick_head.as_ref(),
2724            prev_snapshot.merge.cherry_pick_head.as_ref(),
2725        ) || !sha_eq(
2726            apply_head.as_ref(),
2727            prev_snapshot.merge.apply_head.as_ref(),
2728        ) || !sha_eq(
2729            rebase_head.as_ref(),
2730            prev_snapshot.merge.rebase_head.as_ref(),
2731        ) || !sha_eq(
2732            revert_head.as_ref(),
2733            prev_snapshot.merge.revert_head.as_ref(),
2734        );
2735        let conflicted_paths = if merge_heads_changed {
2736            TreeSet::from_ordered_entries(
2737                status
2738                    .iter()
2739                    .filter(|entry| entry.status.is_conflicted())
2740                    .map(|entry| entry.repo_path.clone()),
2741            )
2742        } else {
2743            prev_snapshot.merge.conflicted_paths.clone()
2744        };
2745        let details = MergeDetails {
2746            conflicted_paths,
2747            message,
2748            apply_head,
2749            cherry_pick_head,
2750            merge_heads,
2751            rebase_head,
2752            revert_head,
2753        };
2754        Ok((details, merge_heads_changed))
2755    }
2756}
2757
2758impl Repository {
2759    pub fn snapshot(&self) -> RepositorySnapshot {
2760        self.snapshot.clone()
2761    }
2762
2763    fn local(
2764        id: RepositoryId,
2765        work_directory_abs_path: Arc<Path>,
2766        dot_git_abs_path: Arc<Path>,
2767        repository_dir_abs_path: Arc<Path>,
2768        common_dir_abs_path: Arc<Path>,
2769        project_environment: WeakEntity<ProjectEnvironment>,
2770        fs: Arc<dyn Fs>,
2771        git_store: WeakEntity<GitStore>,
2772        cx: &mut Context<Self>,
2773    ) -> Self {
2774        let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path.clone());
2775        Repository {
2776            this: cx.weak_entity(),
2777            git_store,
2778            snapshot,
2779            commit_message_buffer: None,
2780            askpass_delegates: Default::default(),
2781            paths_needing_status_update: Default::default(),
2782            latest_askpass_id: 0,
2783            job_sender: Repository::spawn_local_git_worker(
2784                work_directory_abs_path,
2785                dot_git_abs_path,
2786                repository_dir_abs_path,
2787                common_dir_abs_path,
2788                project_environment,
2789                fs,
2790                cx,
2791            ),
2792            job_id: 0,
2793            active_jobs: Default::default(),
2794        }
2795    }
2796
2797    fn remote(
2798        id: RepositoryId,
2799        work_directory_abs_path: Arc<Path>,
2800        project_id: ProjectId,
2801        client: AnyProtoClient,
2802        git_store: WeakEntity<GitStore>,
2803        cx: &mut Context<Self>,
2804    ) -> Self {
2805        let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path);
2806        Self {
2807            this: cx.weak_entity(),
2808            snapshot,
2809            commit_message_buffer: None,
2810            git_store,
2811            paths_needing_status_update: Default::default(),
2812            job_sender: Self::spawn_remote_git_worker(project_id, client, cx),
2813            askpass_delegates: Default::default(),
2814            latest_askpass_id: 0,
2815            active_jobs: Default::default(),
2816            job_id: 0,
2817        }
2818    }
2819
2820    pub fn git_store(&self) -> Option<Entity<GitStore>> {
2821        self.git_store.upgrade()
2822    }
2823
2824    fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
2825        let this = cx.weak_entity();
2826        let git_store = self.git_store.clone();
2827        let _ = self.send_keyed_job(
2828            Some(GitJobKey::ReloadBufferDiffBases),
2829            None,
2830            |state, mut cx| async move {
2831                let RepositoryState::Local { backend, .. } = state else {
2832                    log::error!("tried to recompute diffs for a non-local repository");
2833                    return Ok(());
2834                };
2835
2836                let Some(this) = this.upgrade() else {
2837                    return Ok(());
2838                };
2839
2840                let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
2841                    git_store.update(cx, |git_store, cx| {
2842                        git_store
2843                            .diffs
2844                            .iter()
2845                            .filter_map(|(buffer_id, diff_state)| {
2846                                let buffer_store = git_store.buffer_store.read(cx);
2847                                let buffer = buffer_store.get(*buffer_id)?;
2848                                let file = File::from_dyn(buffer.read(cx).file())?;
2849                                let abs_path =
2850                                    file.worktree.read(cx).absolutize(&file.path).ok()?;
2851                                let repo_path = this.abs_path_to_repo_path(&abs_path)?;
2852                                log::debug!(
2853                                    "start reload diff bases for repo path {}",
2854                                    repo_path.0.display()
2855                                );
2856                                diff_state.update(cx, |diff_state, _| {
2857                                    let has_unstaged_diff = diff_state
2858                                        .unstaged_diff
2859                                        .as_ref()
2860                                        .is_some_and(|diff| diff.is_upgradable());
2861                                    let has_uncommitted_diff = diff_state
2862                                        .uncommitted_diff
2863                                        .as_ref()
2864                                        .is_some_and(|set| set.is_upgradable());
2865
2866                                    Some((
2867                                        buffer,
2868                                        repo_path,
2869                                        has_unstaged_diff.then(|| diff_state.index_text.clone()),
2870                                        has_uncommitted_diff.then(|| diff_state.head_text.clone()),
2871                                    ))
2872                                })
2873                            })
2874                            .collect::<Vec<_>>()
2875                    })
2876                })??;
2877
2878                let buffer_diff_base_changes = cx
2879                    .background_spawn(async move {
2880                        let mut changes = Vec::new();
2881                        for (buffer, repo_path, current_index_text, current_head_text) in
2882                            &repo_diff_state_updates
2883                        {
2884                            let index_text = if current_index_text.is_some() {
2885                                backend.load_index_text(repo_path.clone()).await
2886                            } else {
2887                                None
2888                            };
2889                            let head_text = if current_head_text.is_some() {
2890                                backend.load_committed_text(repo_path.clone()).await
2891                            } else {
2892                                None
2893                            };
2894
2895                            let change =
2896                                match (current_index_text.as_ref(), current_head_text.as_ref()) {
2897                                    (Some(current_index), Some(current_head)) => {
2898                                        let index_changed =
2899                                            index_text.as_ref() != current_index.as_deref();
2900                                        let head_changed =
2901                                            head_text.as_ref() != current_head.as_deref();
2902                                        if index_changed && head_changed {
2903                                            if index_text == head_text {
2904                                                Some(DiffBasesChange::SetBoth(head_text))
2905                                            } else {
2906                                                Some(DiffBasesChange::SetEach {
2907                                                    index: index_text,
2908                                                    head: head_text,
2909                                                })
2910                                            }
2911                                        } else if index_changed {
2912                                            Some(DiffBasesChange::SetIndex(index_text))
2913                                        } else if head_changed {
2914                                            Some(DiffBasesChange::SetHead(head_text))
2915                                        } else {
2916                                            None
2917                                        }
2918                                    }
2919                                    (Some(current_index), None) => {
2920                                        let index_changed =
2921                                            index_text.as_ref() != current_index.as_deref();
2922                                        index_changed
2923                                            .then_some(DiffBasesChange::SetIndex(index_text))
2924                                    }
2925                                    (None, Some(current_head)) => {
2926                                        let head_changed =
2927                                            head_text.as_ref() != current_head.as_deref();
2928                                        head_changed.then_some(DiffBasesChange::SetHead(head_text))
2929                                    }
2930                                    (None, None) => None,
2931                                };
2932
2933                            changes.push((buffer.clone(), change))
2934                        }
2935                        changes
2936                    })
2937                    .await;
2938
2939                git_store.update(&mut cx, |git_store, cx| {
2940                    for (buffer, diff_bases_change) in buffer_diff_base_changes {
2941                        let buffer_snapshot = buffer.read(cx).text_snapshot();
2942                        let buffer_id = buffer_snapshot.remote_id();
2943                        let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
2944                            continue;
2945                        };
2946
2947                        let downstream_client = git_store.downstream_client();
2948                        diff_state.update(cx, |diff_state, cx| {
2949                            use proto::update_diff_bases::Mode;
2950
2951                            if let Some((diff_bases_change, (client, project_id))) =
2952                                diff_bases_change.clone().zip(downstream_client)
2953                            {
2954                                let (staged_text, committed_text, mode) = match diff_bases_change {
2955                                    DiffBasesChange::SetIndex(index) => {
2956                                        (index, None, Mode::IndexOnly)
2957                                    }
2958                                    DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
2959                                    DiffBasesChange::SetEach { index, head } => {
2960                                        (index, head, Mode::IndexAndHead)
2961                                    }
2962                                    DiffBasesChange::SetBoth(text) => {
2963                                        (None, text, Mode::IndexMatchesHead)
2964                                    }
2965                                };
2966                                client
2967                                    .send(proto::UpdateDiffBases {
2968                                        project_id: project_id.to_proto(),
2969                                        buffer_id: buffer_id.to_proto(),
2970                                        staged_text,
2971                                        committed_text,
2972                                        mode: mode as i32,
2973                                    })
2974                                    .log_err();
2975                            }
2976
2977                            diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
2978                        });
2979                    }
2980                })
2981            },
2982        );
2983    }
2984
2985    pub fn send_job<F, Fut, R>(
2986        &mut self,
2987        status: Option<SharedString>,
2988        job: F,
2989    ) -> oneshot::Receiver<R>
2990    where
2991        F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
2992        Fut: Future<Output = R> + 'static,
2993        R: Send + 'static,
2994    {
2995        self.send_keyed_job(None, status, job)
2996    }
2997
2998    fn send_keyed_job<F, Fut, R>(
2999        &mut self,
3000        key: Option<GitJobKey>,
3001        status: Option<SharedString>,
3002        job: F,
3003    ) -> oneshot::Receiver<R>
3004    where
3005        F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3006        Fut: Future<Output = R> + 'static,
3007        R: Send + 'static,
3008    {
3009        let (result_tx, result_rx) = futures::channel::oneshot::channel();
3010        let job_id = post_inc(&mut self.job_id);
3011        let this = self.this.clone();
3012        self.job_sender
3013            .unbounded_send(GitJob {
3014                key,
3015                job: Box::new(move |state, cx: &mut AsyncApp| {
3016                    let job = job(state, cx.clone());
3017                    cx.spawn(async move |cx| {
3018                        if let Some(s) = status.clone() {
3019                            this.update(cx, |this, cx| {
3020                                this.active_jobs.insert(
3021                                    job_id,
3022                                    JobInfo {
3023                                        start: Instant::now(),
3024                                        message: s.clone(),
3025                                    },
3026                                );
3027
3028                                cx.notify();
3029                            })
3030                            .ok();
3031                        }
3032                        let result = job.await;
3033
3034                        this.update(cx, |this, cx| {
3035                            this.active_jobs.remove(&job_id);
3036                            cx.notify();
3037                        })
3038                        .ok();
3039
3040                        result_tx.send(result).ok();
3041                    })
3042                }),
3043            })
3044            .ok();
3045        result_rx
3046    }
3047
3048    pub fn set_as_active_repository(&self, cx: &mut Context<Self>) {
3049        let Some(git_store) = self.git_store.upgrade() else {
3050            return;
3051        };
3052        let entity = cx.entity();
3053        git_store.update(cx, |git_store, cx| {
3054            let Some((&id, _)) = git_store
3055                .repositories
3056                .iter()
3057                .find(|(_, handle)| *handle == &entity)
3058            else {
3059                return;
3060            };
3061            git_store.active_repo_id = Some(id);
3062            cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
3063        });
3064    }
3065
3066    pub fn cached_status(&self) -> impl '_ + Iterator<Item = StatusEntry> {
3067        self.snapshot.status()
3068    }
3069
3070    pub fn repo_path_to_project_path(&self, path: &RepoPath, cx: &App) -> Option<ProjectPath> {
3071        let git_store = self.git_store.upgrade()?;
3072        let worktree_store = git_store.read(cx).worktree_store.read(cx);
3073        let abs_path = self.snapshot.work_directory_abs_path.join(&path.0);
3074        let (worktree, relative_path) = worktree_store.find_worktree(abs_path, cx)?;
3075        Some(ProjectPath {
3076            worktree_id: worktree.read(cx).id(),
3077            path: relative_path.into(),
3078        })
3079    }
3080
3081    pub fn project_path_to_repo_path(&self, path: &ProjectPath, cx: &App) -> Option<RepoPath> {
3082        let git_store = self.git_store.upgrade()?;
3083        let worktree_store = git_store.read(cx).worktree_store.read(cx);
3084        let abs_path = worktree_store.absolutize(path, cx)?;
3085        self.snapshot.abs_path_to_repo_path(&abs_path)
3086    }
3087
3088    pub fn contains_sub_repo(&self, other: &Entity<Self>, cx: &App) -> bool {
3089        other
3090            .read(cx)
3091            .snapshot
3092            .work_directory_abs_path
3093            .starts_with(&self.snapshot.work_directory_abs_path)
3094    }
3095
3096    pub fn open_commit_buffer(
3097        &mut self,
3098        languages: Option<Arc<LanguageRegistry>>,
3099        buffer_store: Entity<BufferStore>,
3100        cx: &mut Context<Self>,
3101    ) -> Task<Result<Entity<Buffer>>> {
3102        let id = self.id;
3103        if let Some(buffer) = self.commit_message_buffer.clone() {
3104            return Task::ready(Ok(buffer));
3105        }
3106        let this = cx.weak_entity();
3107
3108        let rx = self.send_job(None, move |state, mut cx| async move {
3109            let Some(this) = this.upgrade() else {
3110                bail!("git store was dropped");
3111            };
3112            match state {
3113                RepositoryState::Local { .. } => {
3114                    this.update(&mut cx, |_, cx| {
3115                        Self::open_local_commit_buffer(languages, buffer_store, cx)
3116                    })?
3117                    .await
3118                }
3119                RepositoryState::Remote { project_id, client } => {
3120                    let request = client.request(proto::OpenCommitMessageBuffer {
3121                        project_id: project_id.0,
3122                        repository_id: id.to_proto(),
3123                    });
3124                    let response = request.await.context("requesting to open commit buffer")?;
3125                    let buffer_id = BufferId::new(response.buffer_id)?;
3126                    let buffer = buffer_store
3127                        .update(&mut cx, |buffer_store, cx| {
3128                            buffer_store.wait_for_remote_buffer(buffer_id, cx)
3129                        })?
3130                        .await?;
3131                    if let Some(language_registry) = languages {
3132                        let git_commit_language =
3133                            language_registry.language_for_name("Git Commit").await?;
3134                        buffer.update(&mut cx, |buffer, cx| {
3135                            buffer.set_language(Some(git_commit_language), cx);
3136                        })?;
3137                    }
3138                    this.update(&mut cx, |this, _| {
3139                        this.commit_message_buffer = Some(buffer.clone());
3140                    })?;
3141                    Ok(buffer)
3142                }
3143            }
3144        });
3145
3146        cx.spawn(|_, _: &mut AsyncApp| async move { rx.await? })
3147    }
3148
3149    fn open_local_commit_buffer(
3150        language_registry: Option<Arc<LanguageRegistry>>,
3151        buffer_store: Entity<BufferStore>,
3152        cx: &mut Context<Self>,
3153    ) -> Task<Result<Entity<Buffer>>> {
3154        cx.spawn(async move |repository, cx| {
3155            let buffer = buffer_store
3156                .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx))?
3157                .await?;
3158
3159            if let Some(language_registry) = language_registry {
3160                let git_commit_language = language_registry.language_for_name("Git Commit").await?;
3161                buffer.update(cx, |buffer, cx| {
3162                    buffer.set_language(Some(git_commit_language), cx);
3163                })?;
3164            }
3165
3166            repository.update(cx, |repository, _| {
3167                repository.commit_message_buffer = Some(buffer.clone());
3168            })?;
3169            Ok(buffer)
3170        })
3171    }
3172
3173    pub fn checkout_files(
3174        &mut self,
3175        commit: &str,
3176        paths: Vec<RepoPath>,
3177        _cx: &mut App,
3178    ) -> oneshot::Receiver<Result<()>> {
3179        let commit = commit.to_string();
3180        let id = self.id;
3181
3182        self.send_job(
3183            Some(format!("git checkout {}", commit).into()),
3184            move |git_repo, _| async move {
3185                match git_repo {
3186                    RepositoryState::Local {
3187                        backend,
3188                        environment,
3189                        ..
3190                    } => {
3191                        backend
3192                            .checkout_files(commit, paths, environment.clone())
3193                            .await
3194                    }
3195                    RepositoryState::Remote { project_id, client } => {
3196                        client
3197                            .request(proto::GitCheckoutFiles {
3198                                project_id: project_id.0,
3199                                repository_id: id.to_proto(),
3200                                commit,
3201                                paths: paths
3202                                    .into_iter()
3203                                    .map(|p| p.to_string_lossy().to_string())
3204                                    .collect(),
3205                            })
3206                            .await?;
3207
3208                        Ok(())
3209                    }
3210                }
3211            },
3212        )
3213    }
3214
3215    pub fn reset(
3216        &mut self,
3217        commit: String,
3218        reset_mode: ResetMode,
3219        _cx: &mut App,
3220    ) -> oneshot::Receiver<Result<()>> {
3221        let commit = commit.to_string();
3222        let id = self.id;
3223
3224        self.send_job(None, move |git_repo, _| async move {
3225            match git_repo {
3226                RepositoryState::Local {
3227                    backend,
3228                    environment,
3229                    ..
3230                } => backend.reset(commit, reset_mode, environment).await,
3231                RepositoryState::Remote { project_id, client } => {
3232                    client
3233                        .request(proto::GitReset {
3234                            project_id: project_id.0,
3235                            repository_id: id.to_proto(),
3236                            commit,
3237                            mode: match reset_mode {
3238                                ResetMode::Soft => git_reset::ResetMode::Soft.into(),
3239                                ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
3240                            },
3241                        })
3242                        .await?;
3243
3244                    Ok(())
3245                }
3246            }
3247        })
3248    }
3249
3250    pub fn show(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDetails>> {
3251        let id = self.id;
3252        self.send_job(None, move |git_repo, _cx| async move {
3253            match git_repo {
3254                RepositoryState::Local { backend, .. } => backend.show(commit).await,
3255                RepositoryState::Remote { project_id, client } => {
3256                    let resp = client
3257                        .request(proto::GitShow {
3258                            project_id: project_id.0,
3259                            repository_id: id.to_proto(),
3260                            commit,
3261                        })
3262                        .await?;
3263
3264                    Ok(CommitDetails {
3265                        sha: resp.sha.into(),
3266                        message: resp.message.into(),
3267                        commit_timestamp: resp.commit_timestamp,
3268                        author_email: resp.author_email.into(),
3269                        author_name: resp.author_name.into(),
3270                    })
3271                }
3272            }
3273        })
3274    }
3275
3276    pub fn load_commit_diff(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
3277        let id = self.id;
3278        self.send_job(None, move |git_repo, cx| async move {
3279            match git_repo {
3280                RepositoryState::Local { backend, .. } => backend.load_commit(commit, cx).await,
3281                RepositoryState::Remote {
3282                    client, project_id, ..
3283                } => {
3284                    let response = client
3285                        .request(proto::LoadCommitDiff {
3286                            project_id: project_id.0,
3287                            repository_id: id.to_proto(),
3288                            commit,
3289                        })
3290                        .await?;
3291                    Ok(CommitDiff {
3292                        files: response
3293                            .files
3294                            .into_iter()
3295                            .map(|file| CommitFile {
3296                                path: Path::new(&file.path).into(),
3297                                old_text: file.old_text,
3298                                new_text: file.new_text,
3299                            })
3300                            .collect(),
3301                    })
3302                }
3303            }
3304        })
3305    }
3306
3307    fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
3308        Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
3309    }
3310
3311    pub fn stage_entries(
3312        &self,
3313        entries: Vec<RepoPath>,
3314        cx: &mut Context<Self>,
3315    ) -> Task<anyhow::Result<()>> {
3316        if entries.is_empty() {
3317            return Task::ready(Ok(()));
3318        }
3319        let id = self.id;
3320
3321        let mut save_futures = Vec::new();
3322        if let Some(buffer_store) = self.buffer_store(cx) {
3323            buffer_store.update(cx, |buffer_store, cx| {
3324                for path in &entries {
3325                    let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3326                        continue;
3327                    };
3328                    if let Some(buffer) = buffer_store.get_by_path(&project_path, cx) {
3329                        if buffer
3330                            .read(cx)
3331                            .file()
3332                            .map_or(false, |file| file.disk_state().exists())
3333                        {
3334                            save_futures.push(buffer_store.save_buffer(buffer, cx));
3335                        }
3336                    }
3337                }
3338            })
3339        }
3340
3341        cx.spawn(async move |this, cx| {
3342            for save_future in save_futures {
3343                save_future.await?;
3344            }
3345
3346            this.update(cx, |this, _| {
3347                this.send_job(None, move |git_repo, _cx| async move {
3348                    match git_repo {
3349                        RepositoryState::Local {
3350                            backend,
3351                            environment,
3352                            ..
3353                        } => backend.stage_paths(entries, environment.clone()).await,
3354                        RepositoryState::Remote { project_id, client } => {
3355                            client
3356                                .request(proto::Stage {
3357                                    project_id: project_id.0,
3358                                    repository_id: id.to_proto(),
3359                                    paths: entries
3360                                        .into_iter()
3361                                        .map(|repo_path| repo_path.as_ref().to_proto())
3362                                        .collect(),
3363                                })
3364                                .await
3365                                .context("sending stage request")?;
3366
3367                            Ok(())
3368                        }
3369                    }
3370                })
3371            })?
3372            .await??;
3373
3374            Ok(())
3375        })
3376    }
3377
3378    pub fn unstage_entries(
3379        &self,
3380        entries: Vec<RepoPath>,
3381        cx: &mut Context<Self>,
3382    ) -> Task<anyhow::Result<()>> {
3383        if entries.is_empty() {
3384            return Task::ready(Ok(()));
3385        }
3386        let id = self.id;
3387
3388        let mut save_futures = Vec::new();
3389        if let Some(buffer_store) = self.buffer_store(cx) {
3390            buffer_store.update(cx, |buffer_store, cx| {
3391                for path in &entries {
3392                    let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3393                        continue;
3394                    };
3395                    if let Some(buffer) = buffer_store.get_by_path(&project_path, cx) {
3396                        if buffer
3397                            .read(cx)
3398                            .file()
3399                            .map_or(false, |file| file.disk_state().exists())
3400                        {
3401                            save_futures.push(buffer_store.save_buffer(buffer, cx));
3402                        }
3403                    }
3404                }
3405            })
3406        }
3407
3408        cx.spawn(async move |this, cx| {
3409            for save_future in save_futures {
3410                save_future.await?;
3411            }
3412
3413            this.update(cx, |this, _| {
3414                this.send_job(None, move |git_repo, _cx| async move {
3415                    match git_repo {
3416                        RepositoryState::Local {
3417                            backend,
3418                            environment,
3419                            ..
3420                        } => backend.unstage_paths(entries, environment).await,
3421                        RepositoryState::Remote { project_id, client } => {
3422                            client
3423                                .request(proto::Unstage {
3424                                    project_id: project_id.0,
3425                                    repository_id: id.to_proto(),
3426                                    paths: entries
3427                                        .into_iter()
3428                                        .map(|repo_path| repo_path.as_ref().to_proto())
3429                                        .collect(),
3430                                })
3431                                .await
3432                                .context("sending unstage request")?;
3433
3434                            Ok(())
3435                        }
3436                    }
3437                })
3438            })?
3439            .await??;
3440
3441            Ok(())
3442        })
3443    }
3444
3445    pub fn stage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3446        let to_stage = self
3447            .cached_status()
3448            .filter(|entry| !entry.status.staging().is_fully_staged())
3449            .map(|entry| entry.repo_path.clone())
3450            .collect();
3451        self.stage_entries(to_stage, cx)
3452    }
3453
3454    pub fn unstage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3455        let to_unstage = self
3456            .cached_status()
3457            .filter(|entry| entry.status.staging().has_staged())
3458            .map(|entry| entry.repo_path.clone())
3459            .collect();
3460        self.unstage_entries(to_unstage, cx)
3461    }
3462
3463    pub fn commit(
3464        &mut self,
3465        message: SharedString,
3466        name_and_email: Option<(SharedString, SharedString)>,
3467        options: CommitOptions,
3468        _cx: &mut App,
3469    ) -> oneshot::Receiver<Result<()>> {
3470        let id = self.id;
3471
3472        self.send_job(Some("git commit".into()), move |git_repo, _cx| async move {
3473            match git_repo {
3474                RepositoryState::Local {
3475                    backend,
3476                    environment,
3477                    ..
3478                } => {
3479                    backend
3480                        .commit(message, name_and_email, options, environment)
3481                        .await
3482                }
3483                RepositoryState::Remote { project_id, client } => {
3484                    let (name, email) = name_and_email.unzip();
3485                    client
3486                        .request(proto::Commit {
3487                            project_id: project_id.0,
3488                            repository_id: id.to_proto(),
3489                            message: String::from(message),
3490                            name: name.map(String::from),
3491                            email: email.map(String::from),
3492                            options: Some(proto::commit::CommitOptions {
3493                                amend: options.amend,
3494                            }),
3495                        })
3496                        .await
3497                        .context("sending commit request")?;
3498
3499                    Ok(())
3500                }
3501            }
3502        })
3503    }
3504
3505    pub fn fetch(
3506        &mut self,
3507        askpass: AskPassDelegate,
3508        _cx: &mut App,
3509    ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3510        let askpass_delegates = self.askpass_delegates.clone();
3511        let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3512        let id = self.id;
3513
3514        self.send_job(Some("git fetch".into()), move |git_repo, cx| async move {
3515            match git_repo {
3516                RepositoryState::Local {
3517                    backend,
3518                    environment,
3519                    ..
3520                } => backend.fetch(askpass, environment, cx).await,
3521                RepositoryState::Remote { project_id, client } => {
3522                    askpass_delegates.lock().insert(askpass_id, askpass);
3523                    let _defer = util::defer(|| {
3524                        let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3525                        debug_assert!(askpass_delegate.is_some());
3526                    });
3527
3528                    let response = client
3529                        .request(proto::Fetch {
3530                            project_id: project_id.0,
3531                            repository_id: id.to_proto(),
3532                            askpass_id,
3533                        })
3534                        .await
3535                        .context("sending fetch request")?;
3536
3537                    Ok(RemoteCommandOutput {
3538                        stdout: response.stdout,
3539                        stderr: response.stderr,
3540                    })
3541                }
3542            }
3543        })
3544    }
3545
3546    pub fn push(
3547        &mut self,
3548        branch: SharedString,
3549        remote: SharedString,
3550        options: Option<PushOptions>,
3551        askpass: AskPassDelegate,
3552        cx: &mut Context<Self>,
3553    ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3554        let askpass_delegates = self.askpass_delegates.clone();
3555        let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3556        let id = self.id;
3557
3558        let args = options
3559            .map(|option| match option {
3560                PushOptions::SetUpstream => " --set-upstream",
3561                PushOptions::Force => " --force",
3562            })
3563            .unwrap_or("");
3564
3565        let updates_tx = self
3566            .git_store()
3567            .and_then(|git_store| match &git_store.read(cx).state {
3568                GitStoreState::Local { downstream, .. } => downstream
3569                    .as_ref()
3570                    .map(|downstream| downstream.updates_tx.clone()),
3571                _ => None,
3572            });
3573
3574        let this = cx.weak_entity();
3575        self.send_job(
3576            Some(format!("git push {} {} {}", args, branch, remote).into()),
3577            move |git_repo, mut cx| async move {
3578                match git_repo {
3579                    RepositoryState::Local {
3580                        backend,
3581                        environment,
3582                        ..
3583                    } => {
3584                        let result = backend
3585                            .push(
3586                                branch.to_string(),
3587                                remote.to_string(),
3588                                options,
3589                                askpass,
3590                                environment.clone(),
3591                                cx.clone(),
3592                            )
3593                            .await;
3594                        if result.is_ok() {
3595                            let branches = backend.branches().await?;
3596                            let branch = branches.into_iter().find(|branch| branch.is_head);
3597                            log::info!("head branch after scan is {branch:?}");
3598                            let snapshot = this.update(&mut cx, |this, cx| {
3599                                this.snapshot.branch = branch;
3600                                let snapshot = this.snapshot.clone();
3601                                cx.emit(RepositoryEvent::Updated { full_scan: false });
3602                                snapshot
3603                            })?;
3604                            if let Some(updates_tx) = updates_tx {
3605                                updates_tx
3606                                    .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
3607                                    .ok();
3608                            }
3609                        }
3610                        result
3611                    }
3612                    RepositoryState::Remote { project_id, client } => {
3613                        askpass_delegates.lock().insert(askpass_id, askpass);
3614                        let _defer = util::defer(|| {
3615                            let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3616                            debug_assert!(askpass_delegate.is_some());
3617                        });
3618                        let response = client
3619                            .request(proto::Push {
3620                                project_id: project_id.0,
3621                                repository_id: id.to_proto(),
3622                                askpass_id,
3623                                branch_name: branch.to_string(),
3624                                remote_name: remote.to_string(),
3625                                options: options.map(|options| match options {
3626                                    PushOptions::Force => proto::push::PushOptions::Force,
3627                                    PushOptions::SetUpstream => {
3628                                        proto::push::PushOptions::SetUpstream
3629                                    }
3630                                }
3631                                    as i32),
3632                            })
3633                            .await
3634                            .context("sending push request")?;
3635
3636                        Ok(RemoteCommandOutput {
3637                            stdout: response.stdout,
3638                            stderr: response.stderr,
3639                        })
3640                    }
3641                }
3642            },
3643        )
3644    }
3645
3646    pub fn pull(
3647        &mut self,
3648        branch: SharedString,
3649        remote: SharedString,
3650        askpass: AskPassDelegate,
3651        _cx: &mut App,
3652    ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3653        let askpass_delegates = self.askpass_delegates.clone();
3654        let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3655        let id = self.id;
3656
3657        self.send_job(
3658            Some(format!("git pull {} {}", remote, branch).into()),
3659            move |git_repo, cx| async move {
3660                match git_repo {
3661                    RepositoryState::Local {
3662                        backend,
3663                        environment,
3664                        ..
3665                    } => {
3666                        backend
3667                            .pull(
3668                                branch.to_string(),
3669                                remote.to_string(),
3670                                askpass,
3671                                environment.clone(),
3672                                cx,
3673                            )
3674                            .await
3675                    }
3676                    RepositoryState::Remote { project_id, client } => {
3677                        askpass_delegates.lock().insert(askpass_id, askpass);
3678                        let _defer = util::defer(|| {
3679                            let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3680                            debug_assert!(askpass_delegate.is_some());
3681                        });
3682                        let response = client
3683                            .request(proto::Pull {
3684                                project_id: project_id.0,
3685                                repository_id: id.to_proto(),
3686                                askpass_id,
3687                                branch_name: branch.to_string(),
3688                                remote_name: remote.to_string(),
3689                            })
3690                            .await
3691                            .context("sending pull request")?;
3692
3693                        Ok(RemoteCommandOutput {
3694                            stdout: response.stdout,
3695                            stderr: response.stderr,
3696                        })
3697                    }
3698                }
3699            },
3700        )
3701    }
3702
3703    fn spawn_set_index_text_job(
3704        &mut self,
3705        path: RepoPath,
3706        content: Option<String>,
3707        hunk_staging_operation_count: Option<usize>,
3708        cx: &mut Context<Self>,
3709    ) -> oneshot::Receiver<anyhow::Result<()>> {
3710        let id = self.id;
3711        let this = cx.weak_entity();
3712        let git_store = self.git_store.clone();
3713        self.send_keyed_job(
3714            Some(GitJobKey::WriteIndex(path.clone())),
3715            None,
3716            move |git_repo, mut cx| async move {
3717                log::debug!("start updating index text for buffer {}", path.display());
3718                match git_repo {
3719                    RepositoryState::Local {
3720                        backend,
3721                        environment,
3722                        ..
3723                    } => {
3724                        backend
3725                            .set_index_text(path.clone(), content, environment.clone())
3726                            .await?;
3727                    }
3728                    RepositoryState::Remote { project_id, client } => {
3729                        client
3730                            .request(proto::SetIndexText {
3731                                project_id: project_id.0,
3732                                repository_id: id.to_proto(),
3733                                path: path.as_ref().to_proto(),
3734                                text: content,
3735                            })
3736                            .await?;
3737                    }
3738                }
3739                log::debug!("finish updating index text for buffer {}", path.display());
3740
3741                if let Some(hunk_staging_operation_count) = hunk_staging_operation_count {
3742                    let project_path = this
3743                        .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
3744                        .ok()
3745                        .flatten();
3746                    git_store.update(&mut cx, |git_store, cx| {
3747                        let buffer_id = git_store
3748                            .buffer_store
3749                            .read(cx)
3750                            .get_by_path(&project_path?, cx)?
3751                            .read(cx)
3752                            .remote_id();
3753                        let diff_state = git_store.diffs.get(&buffer_id)?;
3754                        diff_state.update(cx, |diff_state, _| {
3755                            diff_state.hunk_staging_operation_count_as_of_write =
3756                                hunk_staging_operation_count;
3757                        });
3758                        Some(())
3759                    })?;
3760                }
3761                Ok(())
3762            },
3763        )
3764    }
3765
3766    pub fn get_remotes(
3767        &mut self,
3768        branch_name: Option<String>,
3769    ) -> oneshot::Receiver<Result<Vec<Remote>>> {
3770        let id = self.id;
3771        self.send_job(None, move |repo, _cx| async move {
3772            match repo {
3773                RepositoryState::Local { backend, .. } => backend.get_remotes(branch_name).await,
3774                RepositoryState::Remote { project_id, client } => {
3775                    let response = client
3776                        .request(proto::GetRemotes {
3777                            project_id: project_id.0,
3778                            repository_id: id.to_proto(),
3779                            branch_name,
3780                        })
3781                        .await?;
3782
3783                    let remotes = response
3784                        .remotes
3785                        .into_iter()
3786                        .map(|remotes| git::repository::Remote {
3787                            name: remotes.name.into(),
3788                        })
3789                        .collect();
3790
3791                    Ok(remotes)
3792                }
3793            }
3794        })
3795    }
3796
3797    pub fn branches(&mut self) -> oneshot::Receiver<Result<Vec<Branch>>> {
3798        let id = self.id;
3799        self.send_job(None, move |repo, cx| async move {
3800            match repo {
3801                RepositoryState::Local { backend, .. } => {
3802                    let backend = backend.clone();
3803                    cx.background_spawn(async move { backend.branches().await })
3804                        .await
3805                }
3806                RepositoryState::Remote { project_id, client } => {
3807                    let response = client
3808                        .request(proto::GitGetBranches {
3809                            project_id: project_id.0,
3810                            repository_id: id.to_proto(),
3811                        })
3812                        .await?;
3813
3814                    let branches = response
3815                        .branches
3816                        .into_iter()
3817                        .map(|branch| proto_to_branch(&branch))
3818                        .collect();
3819
3820                    Ok(branches)
3821                }
3822            }
3823        })
3824    }
3825
3826    pub fn diff(&mut self, diff_type: DiffType, _cx: &App) -> oneshot::Receiver<Result<String>> {
3827        let id = self.id;
3828        self.send_job(None, move |repo, _cx| async move {
3829            match repo {
3830                RepositoryState::Local { backend, .. } => backend.diff(diff_type).await,
3831                RepositoryState::Remote { project_id, client } => {
3832                    let response = client
3833                        .request(proto::GitDiff {
3834                            project_id: project_id.0,
3835                            repository_id: id.to_proto(),
3836                            diff_type: match diff_type {
3837                                DiffType::HeadToIndex => {
3838                                    proto::git_diff::DiffType::HeadToIndex.into()
3839                                }
3840                                DiffType::HeadToWorktree => {
3841                                    proto::git_diff::DiffType::HeadToWorktree.into()
3842                                }
3843                            },
3844                        })
3845                        .await?;
3846
3847                    Ok(response.diff)
3848                }
3849            }
3850        })
3851    }
3852
3853    pub fn create_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3854        let id = self.id;
3855        self.send_job(
3856            Some(format!("git switch -c {branch_name}").into()),
3857            move |repo, _cx| async move {
3858                match repo {
3859                    RepositoryState::Local { backend, .. } => {
3860                        backend.create_branch(branch_name).await
3861                    }
3862                    RepositoryState::Remote { project_id, client } => {
3863                        client
3864                            .request(proto::GitCreateBranch {
3865                                project_id: project_id.0,
3866                                repository_id: id.to_proto(),
3867                                branch_name,
3868                            })
3869                            .await?;
3870
3871                        Ok(())
3872                    }
3873                }
3874            },
3875        )
3876    }
3877
3878    pub fn change_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3879        let id = self.id;
3880        self.send_job(
3881            Some(format!("git switch {branch_name}").into()),
3882            move |repo, _cx| async move {
3883                match repo {
3884                    RepositoryState::Local { backend, .. } => {
3885                        backend.change_branch(branch_name).await
3886                    }
3887                    RepositoryState::Remote { project_id, client } => {
3888                        client
3889                            .request(proto::GitChangeBranch {
3890                                project_id: project_id.0,
3891                                repository_id: id.to_proto(),
3892                                branch_name,
3893                            })
3894                            .await?;
3895
3896                        Ok(())
3897                    }
3898                }
3899            },
3900        )
3901    }
3902
3903    pub fn check_for_pushed_commits(&mut self) -> oneshot::Receiver<Result<Vec<SharedString>>> {
3904        let id = self.id;
3905        self.send_job(None, move |repo, _cx| async move {
3906            match repo {
3907                RepositoryState::Local { backend, .. } => backend.check_for_pushed_commit().await,
3908                RepositoryState::Remote { project_id, client } => {
3909                    let response = client
3910                        .request(proto::CheckForPushedCommits {
3911                            project_id: project_id.0,
3912                            repository_id: id.to_proto(),
3913                        })
3914                        .await?;
3915
3916                    let branches = response.pushed_to.into_iter().map(Into::into).collect();
3917
3918                    Ok(branches)
3919                }
3920            }
3921        })
3922    }
3923
3924    pub fn checkpoint(&mut self) -> oneshot::Receiver<Result<GitRepositoryCheckpoint>> {
3925        self.send_job(None, |repo, _cx| async move {
3926            match repo {
3927                RepositoryState::Local { backend, .. } => backend.checkpoint().await,
3928                RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3929            }
3930        })
3931    }
3932
3933    pub fn restore_checkpoint(
3934        &mut self,
3935        checkpoint: GitRepositoryCheckpoint,
3936    ) -> oneshot::Receiver<Result<()>> {
3937        self.send_job(None, move |repo, _cx| async move {
3938            match repo {
3939                RepositoryState::Local { backend, .. } => {
3940                    backend.restore_checkpoint(checkpoint).await
3941                }
3942                RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3943            }
3944        })
3945    }
3946
3947    pub(crate) fn apply_remote_update(
3948        &mut self,
3949        update: proto::UpdateRepository,
3950        cx: &mut Context<Self>,
3951    ) -> Result<()> {
3952        let conflicted_paths = TreeSet::from_ordered_entries(
3953            update
3954                .current_merge_conflicts
3955                .into_iter()
3956                .map(|path| RepoPath(Path::new(&path).into())),
3957        );
3958        self.snapshot.branch = update.branch_summary.as_ref().map(proto_to_branch);
3959        self.snapshot.head_commit = update
3960            .head_commit_details
3961            .as_ref()
3962            .map(proto_to_commit_details);
3963
3964        self.snapshot.merge.conflicted_paths = conflicted_paths;
3965
3966        let edits = update
3967            .removed_statuses
3968            .into_iter()
3969            .map(|path| sum_tree::Edit::Remove(PathKey(FromProto::from_proto(path))))
3970            .chain(
3971                update
3972                    .updated_statuses
3973                    .into_iter()
3974                    .filter_map(|updated_status| {
3975                        Some(sum_tree::Edit::Insert(updated_status.try_into().log_err()?))
3976                    }),
3977            )
3978            .collect::<Vec<_>>();
3979        self.snapshot.statuses_by_path.edit(edits, &());
3980        if update.is_last_update {
3981            self.snapshot.scan_id = update.scan_id;
3982        }
3983        cx.emit(RepositoryEvent::Updated { full_scan: true });
3984        Ok(())
3985    }
3986
3987    pub fn compare_checkpoints(
3988        &mut self,
3989        left: GitRepositoryCheckpoint,
3990        right: GitRepositoryCheckpoint,
3991    ) -> oneshot::Receiver<Result<bool>> {
3992        self.send_job(None, move |repo, _cx| async move {
3993            match repo {
3994                RepositoryState::Local { backend, .. } => {
3995                    backend.compare_checkpoints(left, right).await
3996                }
3997                RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3998            }
3999        })
4000    }
4001
4002    pub fn diff_checkpoints(
4003        &mut self,
4004        base_checkpoint: GitRepositoryCheckpoint,
4005        target_checkpoint: GitRepositoryCheckpoint,
4006    ) -> oneshot::Receiver<Result<String>> {
4007        self.send_job(None, move |repo, _cx| async move {
4008            match repo {
4009                RepositoryState::Local { backend, .. } => {
4010                    backend
4011                        .diff_checkpoints(base_checkpoint, target_checkpoint)
4012                        .await
4013                }
4014                RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
4015            }
4016        })
4017    }
4018
4019    fn schedule_scan(
4020        &mut self,
4021        updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4022        cx: &mut Context<Self>,
4023    ) {
4024        let this = cx.weak_entity();
4025        let _ = self.send_keyed_job(
4026            Some(GitJobKey::ReloadGitState),
4027            None,
4028            |state, mut cx| async move {
4029                let Some(this) = this.upgrade() else {
4030                    return Ok(());
4031                };
4032                let RepositoryState::Local { backend, .. } = state else {
4033                    bail!("not a local repository")
4034                };
4035                let (snapshot, events) = this
4036                    .update(&mut cx, |this, _| {
4037                        compute_snapshot(
4038                            this.id,
4039                            this.work_directory_abs_path.clone(),
4040                            this.snapshot.clone(),
4041                            backend.clone(),
4042                        )
4043                    })?
4044                    .await?;
4045                this.update(&mut cx, |this, cx| {
4046                    this.snapshot = snapshot.clone();
4047                    for event in events {
4048                        cx.emit(event);
4049                    }
4050                })?;
4051                if let Some(updates_tx) = updates_tx {
4052                    updates_tx
4053                        .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
4054                        .ok();
4055                }
4056                Ok(())
4057            },
4058        );
4059    }
4060
4061    fn spawn_local_git_worker(
4062        work_directory_abs_path: Arc<Path>,
4063        dot_git_abs_path: Arc<Path>,
4064        _repository_dir_abs_path: Arc<Path>,
4065        _common_dir_abs_path: Arc<Path>,
4066        project_environment: WeakEntity<ProjectEnvironment>,
4067        fs: Arc<dyn Fs>,
4068        cx: &mut Context<Self>,
4069    ) -> mpsc::UnboundedSender<GitJob> {
4070        let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4071
4072        cx.spawn(async move |_, cx| {
4073            let environment = project_environment
4074                .upgrade()
4075                .ok_or_else(|| anyhow!("missing project environment"))?
4076                .update(cx, |project_environment, cx| {
4077                    project_environment.get_directory_environment(work_directory_abs_path.clone(), cx)
4078                })?
4079                .await
4080                .unwrap_or_else(|| {
4081                    log::error!("failed to get working directory environment for repository {work_directory_abs_path:?}");
4082                    HashMap::default()
4083                });
4084            let backend = cx
4085                .background_spawn(async move {
4086                    fs.open_repo(&dot_git_abs_path)
4087                        .ok_or_else(|| anyhow!("failed to build repository"))
4088                })
4089                .await?;
4090
4091            if let Some(git_hosting_provider_registry) =
4092                cx.update(|cx| GitHostingProviderRegistry::try_global(cx))?
4093            {
4094                git_hosting_providers::register_additional_providers(
4095                    git_hosting_provider_registry,
4096                    backend.clone(),
4097                );
4098            }
4099
4100            let state = RepositoryState::Local {
4101                backend,
4102                environment: Arc::new(environment),
4103            };
4104            let mut jobs = VecDeque::new();
4105            loop {
4106                while let Ok(Some(next_job)) = job_rx.try_next() {
4107                    jobs.push_back(next_job);
4108                }
4109
4110                if let Some(job) = jobs.pop_front() {
4111                    if let Some(current_key) = &job.key {
4112                        if jobs
4113                            .iter()
4114                            .any(|other_job| other_job.key.as_ref() == Some(current_key))
4115                        {
4116                            continue;
4117                        }
4118                    }
4119                    (job.job)(state.clone(), cx).await;
4120                } else if let Some(job) = job_rx.next().await {
4121                    jobs.push_back(job);
4122                } else {
4123                    break;
4124                }
4125            }
4126            anyhow::Ok(())
4127        })
4128        .detach_and_log_err(cx);
4129
4130        job_tx
4131    }
4132
4133    fn spawn_remote_git_worker(
4134        project_id: ProjectId,
4135        client: AnyProtoClient,
4136        cx: &mut Context<Self>,
4137    ) -> mpsc::UnboundedSender<GitJob> {
4138        let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4139
4140        cx.spawn(async move |_, cx| {
4141            let state = RepositoryState::Remote { project_id, client };
4142            let mut jobs = VecDeque::new();
4143            loop {
4144                while let Ok(Some(next_job)) = job_rx.try_next() {
4145                    jobs.push_back(next_job);
4146                }
4147
4148                if let Some(job) = jobs.pop_front() {
4149                    if let Some(current_key) = &job.key {
4150                        if jobs
4151                            .iter()
4152                            .any(|other_job| other_job.key.as_ref() == Some(current_key))
4153                        {
4154                            continue;
4155                        }
4156                    }
4157                    (job.job)(state.clone(), cx).await;
4158                } else if let Some(job) = job_rx.next().await {
4159                    jobs.push_back(job);
4160                } else {
4161                    break;
4162                }
4163            }
4164            anyhow::Ok(())
4165        })
4166        .detach_and_log_err(cx);
4167
4168        job_tx
4169    }
4170
4171    fn load_staged_text(
4172        &mut self,
4173        buffer_id: BufferId,
4174        repo_path: RepoPath,
4175        cx: &App,
4176    ) -> Task<Result<Option<String>>> {
4177        let rx = self.send_job(None, move |state, _| async move {
4178            match state {
4179                RepositoryState::Local { backend, .. } => {
4180                    anyhow::Ok(backend.load_index_text(repo_path).await)
4181                }
4182                RepositoryState::Remote { project_id, client } => {
4183                    let response = client
4184                        .request(proto::OpenUnstagedDiff {
4185                            project_id: project_id.to_proto(),
4186                            buffer_id: buffer_id.to_proto(),
4187                        })
4188                        .await?;
4189                    Ok(response.staged_text)
4190                }
4191            }
4192        });
4193        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4194    }
4195
4196    fn load_committed_text(
4197        &mut self,
4198        buffer_id: BufferId,
4199        repo_path: RepoPath,
4200        cx: &App,
4201    ) -> Task<Result<DiffBasesChange>> {
4202        let rx = self.send_job(None, move |state, _| async move {
4203            match state {
4204                RepositoryState::Local { backend, .. } => {
4205                    let committed_text = backend.load_committed_text(repo_path.clone()).await;
4206                    let staged_text = backend.load_index_text(repo_path).await;
4207                    let diff_bases_change = if committed_text == staged_text {
4208                        DiffBasesChange::SetBoth(committed_text)
4209                    } else {
4210                        DiffBasesChange::SetEach {
4211                            index: staged_text,
4212                            head: committed_text,
4213                        }
4214                    };
4215                    anyhow::Ok(diff_bases_change)
4216                }
4217                RepositoryState::Remote { project_id, client } => {
4218                    use proto::open_uncommitted_diff_response::Mode;
4219
4220                    let response = client
4221                        .request(proto::OpenUncommittedDiff {
4222                            project_id: project_id.to_proto(),
4223                            buffer_id: buffer_id.to_proto(),
4224                        })
4225                        .await?;
4226                    let mode =
4227                        Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
4228                    let bases = match mode {
4229                        Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
4230                        Mode::IndexAndHead => DiffBasesChange::SetEach {
4231                            head: response.committed_text,
4232                            index: response.staged_text,
4233                        },
4234                    };
4235                    Ok(bases)
4236                }
4237            }
4238        });
4239
4240        cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4241    }
4242
4243    fn paths_changed(
4244        &mut self,
4245        paths: Vec<RepoPath>,
4246        updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4247        cx: &mut Context<Self>,
4248    ) {
4249        self.paths_needing_status_update.extend(paths);
4250
4251        let this = cx.weak_entity();
4252        let _ = self.send_keyed_job(
4253            Some(GitJobKey::RefreshStatuses),
4254            None,
4255            |state, mut cx| async move {
4256                let (prev_snapshot, mut changed_paths) = this.update(&mut cx, |this, _| {
4257                    (
4258                        this.snapshot.clone(),
4259                        mem::take(&mut this.paths_needing_status_update),
4260                    )
4261                })?;
4262                let RepositoryState::Local { backend, .. } = state else {
4263                    bail!("not a local repository")
4264                };
4265
4266                let paths = changed_paths.iter().cloned().collect::<Vec<_>>();
4267                let statuses = backend.status(&paths).await?;
4268
4269                let changed_path_statuses = cx
4270                    .background_spawn(async move {
4271                        let mut changed_path_statuses = Vec::new();
4272                        let prev_statuses = prev_snapshot.statuses_by_path.clone();
4273                        let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4274
4275                        for (repo_path, status) in &*statuses.entries {
4276                            changed_paths.remove(repo_path);
4277                            if cursor.seek_forward(&PathTarget::Path(repo_path), Bias::Left, &()) {
4278                                if cursor.item().is_some_and(|entry| entry.status == *status) {
4279                                    continue;
4280                                }
4281                            }
4282
4283                            changed_path_statuses.push(Edit::Insert(StatusEntry {
4284                                repo_path: repo_path.clone(),
4285                                status: *status,
4286                            }));
4287                        }
4288                        let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4289                        for path in changed_paths.iter() {
4290                            if cursor.seek_forward(&PathTarget::Path(&path), Bias::Left, &()) {
4291                                changed_path_statuses.push(Edit::Remove(PathKey(path.0.clone())));
4292                            }
4293                        }
4294                        changed_path_statuses
4295                    })
4296                    .await;
4297
4298                this.update(&mut cx, |this, cx| {
4299                    if !changed_path_statuses.is_empty() {
4300                        this.snapshot
4301                            .statuses_by_path
4302                            .edit(changed_path_statuses, &());
4303                        this.snapshot.scan_id += 1;
4304                        if let Some(updates_tx) = updates_tx {
4305                            updates_tx
4306                                .unbounded_send(DownstreamUpdate::UpdateRepository(
4307                                    this.snapshot.clone(),
4308                                ))
4309                                .ok();
4310                        }
4311                    }
4312                    cx.emit(RepositoryEvent::Updated { full_scan: false });
4313                })
4314            },
4315        );
4316    }
4317
4318    /// currently running git command and when it started
4319    pub fn current_job(&self) -> Option<JobInfo> {
4320        self.active_jobs.values().next().cloned()
4321    }
4322
4323    pub fn barrier(&mut self) -> oneshot::Receiver<()> {
4324        self.send_job(None, |_, _| async {})
4325    }
4326}
4327
4328fn get_permalink_in_rust_registry_src(
4329    provider_registry: Arc<GitHostingProviderRegistry>,
4330    path: PathBuf,
4331    selection: Range<u32>,
4332) -> Result<url::Url> {
4333    #[derive(Deserialize)]
4334    struct CargoVcsGit {
4335        sha1: String,
4336    }
4337
4338    #[derive(Deserialize)]
4339    struct CargoVcsInfo {
4340        git: CargoVcsGit,
4341        path_in_vcs: String,
4342    }
4343
4344    #[derive(Deserialize)]
4345    struct CargoPackage {
4346        repository: String,
4347    }
4348
4349    #[derive(Deserialize)]
4350    struct CargoToml {
4351        package: CargoPackage,
4352    }
4353
4354    let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
4355        let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
4356        Some((dir, json))
4357    }) else {
4358        bail!("No .cargo_vcs_info.json found in parent directories")
4359    };
4360    let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
4361    let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
4362    let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
4363    let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
4364        .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
4365    let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
4366    let permalink = provider.build_permalink(
4367        remote,
4368        BuildPermalinkParams {
4369            sha: &cargo_vcs_info.git.sha1,
4370            path: &path.to_string_lossy(),
4371            selection: Some(selection),
4372        },
4373    );
4374    Ok(permalink)
4375}
4376
4377fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
4378    let Some(blame) = blame else {
4379        return proto::BlameBufferResponse {
4380            blame_response: None,
4381        };
4382    };
4383
4384    let entries = blame
4385        .entries
4386        .into_iter()
4387        .map(|entry| proto::BlameEntry {
4388            sha: entry.sha.as_bytes().into(),
4389            start_line: entry.range.start,
4390            end_line: entry.range.end,
4391            original_line_number: entry.original_line_number,
4392            author: entry.author.clone(),
4393            author_mail: entry.author_mail.clone(),
4394            author_time: entry.author_time,
4395            author_tz: entry.author_tz.clone(),
4396            committer: entry.committer_name.clone(),
4397            committer_mail: entry.committer_email.clone(),
4398            committer_time: entry.committer_time,
4399            committer_tz: entry.committer_tz.clone(),
4400            summary: entry.summary.clone(),
4401            previous: entry.previous.clone(),
4402            filename: entry.filename.clone(),
4403        })
4404        .collect::<Vec<_>>();
4405
4406    let messages = blame
4407        .messages
4408        .into_iter()
4409        .map(|(oid, message)| proto::CommitMessage {
4410            oid: oid.as_bytes().into(),
4411            message,
4412        })
4413        .collect::<Vec<_>>();
4414
4415    proto::BlameBufferResponse {
4416        blame_response: Some(proto::blame_buffer_response::BlameResponse {
4417            entries,
4418            messages,
4419            remote_url: blame.remote_url,
4420        }),
4421    }
4422}
4423
4424fn deserialize_blame_buffer_response(
4425    response: proto::BlameBufferResponse,
4426) -> Option<git::blame::Blame> {
4427    let response = response.blame_response?;
4428    let entries = response
4429        .entries
4430        .into_iter()
4431        .filter_map(|entry| {
4432            Some(git::blame::BlameEntry {
4433                sha: git::Oid::from_bytes(&entry.sha).ok()?,
4434                range: entry.start_line..entry.end_line,
4435                original_line_number: entry.original_line_number,
4436                committer_name: entry.committer,
4437                committer_time: entry.committer_time,
4438                committer_tz: entry.committer_tz,
4439                committer_email: entry.committer_mail,
4440                author: entry.author,
4441                author_mail: entry.author_mail,
4442                author_time: entry.author_time,
4443                author_tz: entry.author_tz,
4444                summary: entry.summary,
4445                previous: entry.previous,
4446                filename: entry.filename,
4447            })
4448        })
4449        .collect::<Vec<_>>();
4450
4451    let messages = response
4452        .messages
4453        .into_iter()
4454        .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
4455        .collect::<HashMap<_, _>>();
4456
4457    Some(Blame {
4458        entries,
4459        messages,
4460        remote_url: response.remote_url,
4461    })
4462}
4463
4464fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
4465    proto::Branch {
4466        is_head: branch.is_head,
4467        name: branch.name.to_string(),
4468        unix_timestamp: branch
4469            .most_recent_commit
4470            .as_ref()
4471            .map(|commit| commit.commit_timestamp as u64),
4472        upstream: branch.upstream.as_ref().map(|upstream| proto::GitUpstream {
4473            ref_name: upstream.ref_name.to_string(),
4474            tracking: upstream
4475                .tracking
4476                .status()
4477                .map(|upstream| proto::UpstreamTracking {
4478                    ahead: upstream.ahead as u64,
4479                    behind: upstream.behind as u64,
4480                }),
4481        }),
4482        most_recent_commit: branch
4483            .most_recent_commit
4484            .as_ref()
4485            .map(|commit| proto::CommitSummary {
4486                sha: commit.sha.to_string(),
4487                subject: commit.subject.to_string(),
4488                commit_timestamp: commit.commit_timestamp,
4489            }),
4490    }
4491}
4492
4493fn proto_to_branch(proto: &proto::Branch) -> git::repository::Branch {
4494    git::repository::Branch {
4495        is_head: proto.is_head,
4496        name: proto.name.clone().into(),
4497        upstream: proto
4498            .upstream
4499            .as_ref()
4500            .map(|upstream| git::repository::Upstream {
4501                ref_name: upstream.ref_name.to_string().into(),
4502                tracking: upstream
4503                    .tracking
4504                    .as_ref()
4505                    .map(|tracking| {
4506                        git::repository::UpstreamTracking::Tracked(UpstreamTrackingStatus {
4507                            ahead: tracking.ahead as u32,
4508                            behind: tracking.behind as u32,
4509                        })
4510                    })
4511                    .unwrap_or(git::repository::UpstreamTracking::Gone),
4512            }),
4513        most_recent_commit: proto.most_recent_commit.as_ref().map(|commit| {
4514            git::repository::CommitSummary {
4515                sha: commit.sha.to_string().into(),
4516                subject: commit.subject.to_string().into(),
4517                commit_timestamp: commit.commit_timestamp,
4518                has_parent: true,
4519            }
4520        }),
4521    }
4522}
4523
4524fn commit_details_to_proto(commit: &CommitDetails) -> proto::GitCommitDetails {
4525    proto::GitCommitDetails {
4526        sha: commit.sha.to_string(),
4527        message: commit.message.to_string(),
4528        commit_timestamp: commit.commit_timestamp,
4529        author_email: commit.author_email.to_string(),
4530        author_name: commit.author_name.to_string(),
4531    }
4532}
4533
4534fn proto_to_commit_details(proto: &proto::GitCommitDetails) -> CommitDetails {
4535    CommitDetails {
4536        sha: proto.sha.clone().into(),
4537        message: proto.message.clone().into(),
4538        commit_timestamp: proto.commit_timestamp,
4539        author_email: proto.author_email.clone().into(),
4540        author_name: proto.author_name.clone().into(),
4541    }
4542}
4543
4544async fn compute_snapshot(
4545    id: RepositoryId,
4546    work_directory_abs_path: Arc<Path>,
4547    prev_snapshot: RepositorySnapshot,
4548    backend: Arc<dyn GitRepository>,
4549) -> Result<(RepositorySnapshot, Vec<RepositoryEvent>)> {
4550    let mut events = Vec::new();
4551    let branches = backend.branches().await?;
4552    let branch = branches.into_iter().find(|branch| branch.is_head);
4553    let statuses = backend.status(&[WORK_DIRECTORY_REPO_PATH.clone()]).await?;
4554    let statuses_by_path = SumTree::from_iter(
4555        statuses
4556            .entries
4557            .iter()
4558            .map(|(repo_path, status)| StatusEntry {
4559                repo_path: repo_path.clone(),
4560                status: *status,
4561            }),
4562        &(),
4563    );
4564    let (merge_details, merge_heads_changed) =
4565        MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?;
4566
4567    if merge_heads_changed
4568        || branch != prev_snapshot.branch
4569        || statuses_by_path != prev_snapshot.statuses_by_path
4570    {
4571        events.push(RepositoryEvent::Updated { full_scan: true });
4572    }
4573
4574    // Cache merge conflict paths so they don't change from staging/unstaging,
4575    // until the merge heads change (at commit time, etc.).
4576    if merge_heads_changed {
4577        events.push(RepositoryEvent::MergeHeadsChanged);
4578    }
4579
4580    // Useful when branch is None in detached head state
4581    let head_commit = match backend.head_sha() {
4582        Some(head_sha) => backend.show(head_sha).await.log_err(),
4583        None => None,
4584    };
4585
4586    let snapshot = RepositorySnapshot {
4587        id,
4588        statuses_by_path,
4589        work_directory_abs_path,
4590        scan_id: prev_snapshot.scan_id + 1,
4591        branch,
4592        head_commit,
4593        merge: merge_details,
4594    };
4595
4596    Ok((snapshot, events))
4597}
4598
4599fn status_from_proto(
4600    simple_status: i32,
4601    status: Option<proto::GitFileStatus>,
4602) -> anyhow::Result<FileStatus> {
4603    use proto::git_file_status::Variant;
4604
4605    let Some(variant) = status.and_then(|status| status.variant) else {
4606        let code = proto::GitStatus::from_i32(simple_status)
4607            .ok_or_else(|| anyhow!("Invalid git status code: {simple_status}"))?;
4608        let result = match code {
4609            proto::GitStatus::Added => TrackedStatus {
4610                worktree_status: StatusCode::Added,
4611                index_status: StatusCode::Unmodified,
4612            }
4613            .into(),
4614            proto::GitStatus::Modified => TrackedStatus {
4615                worktree_status: StatusCode::Modified,
4616                index_status: StatusCode::Unmodified,
4617            }
4618            .into(),
4619            proto::GitStatus::Conflict => UnmergedStatus {
4620                first_head: UnmergedStatusCode::Updated,
4621                second_head: UnmergedStatusCode::Updated,
4622            }
4623            .into(),
4624            proto::GitStatus::Deleted => TrackedStatus {
4625                worktree_status: StatusCode::Deleted,
4626                index_status: StatusCode::Unmodified,
4627            }
4628            .into(),
4629            _ => return Err(anyhow!("Invalid code for simple status: {simple_status}")),
4630        };
4631        return Ok(result);
4632    };
4633
4634    let result = match variant {
4635        Variant::Untracked(_) => FileStatus::Untracked,
4636        Variant::Ignored(_) => FileStatus::Ignored,
4637        Variant::Unmerged(unmerged) => {
4638            let [first_head, second_head] =
4639                [unmerged.first_head, unmerged.second_head].map(|head| {
4640                    let code = proto::GitStatus::from_i32(head)
4641                        .ok_or_else(|| anyhow!("Invalid git status code: {head}"))?;
4642                    let result = match code {
4643                        proto::GitStatus::Added => UnmergedStatusCode::Added,
4644                        proto::GitStatus::Updated => UnmergedStatusCode::Updated,
4645                        proto::GitStatus::Deleted => UnmergedStatusCode::Deleted,
4646                        _ => return Err(anyhow!("Invalid code for unmerged status: {code:?}")),
4647                    };
4648                    Ok(result)
4649                });
4650            let [first_head, second_head] = [first_head?, second_head?];
4651            UnmergedStatus {
4652                first_head,
4653                second_head,
4654            }
4655            .into()
4656        }
4657        Variant::Tracked(tracked) => {
4658            let [index_status, worktree_status] = [tracked.index_status, tracked.worktree_status]
4659                .map(|status| {
4660                    let code = proto::GitStatus::from_i32(status)
4661                        .ok_or_else(|| anyhow!("Invalid git status code: {status}"))?;
4662                    let result = match code {
4663                        proto::GitStatus::Modified => StatusCode::Modified,
4664                        proto::GitStatus::TypeChanged => StatusCode::TypeChanged,
4665                        proto::GitStatus::Added => StatusCode::Added,
4666                        proto::GitStatus::Deleted => StatusCode::Deleted,
4667                        proto::GitStatus::Renamed => StatusCode::Renamed,
4668                        proto::GitStatus::Copied => StatusCode::Copied,
4669                        proto::GitStatus::Unmodified => StatusCode::Unmodified,
4670                        _ => return Err(anyhow!("Invalid code for tracked status: {code:?}")),
4671                    };
4672                    Ok(result)
4673                });
4674            let [index_status, worktree_status] = [index_status?, worktree_status?];
4675            TrackedStatus {
4676                index_status,
4677                worktree_status,
4678            }
4679            .into()
4680        }
4681    };
4682    Ok(result)
4683}
4684
4685fn status_to_proto(status: FileStatus) -> proto::GitFileStatus {
4686    use proto::git_file_status::{Tracked, Unmerged, Variant};
4687
4688    let variant = match status {
4689        FileStatus::Untracked => Variant::Untracked(Default::default()),
4690        FileStatus::Ignored => Variant::Ignored(Default::default()),
4691        FileStatus::Unmerged(UnmergedStatus {
4692            first_head,
4693            second_head,
4694        }) => Variant::Unmerged(Unmerged {
4695            first_head: unmerged_status_to_proto(first_head),
4696            second_head: unmerged_status_to_proto(second_head),
4697        }),
4698        FileStatus::Tracked(TrackedStatus {
4699            index_status,
4700            worktree_status,
4701        }) => Variant::Tracked(Tracked {
4702            index_status: tracked_status_to_proto(index_status),
4703            worktree_status: tracked_status_to_proto(worktree_status),
4704        }),
4705    };
4706    proto::GitFileStatus {
4707        variant: Some(variant),
4708    }
4709}
4710
4711fn unmerged_status_to_proto(code: UnmergedStatusCode) -> i32 {
4712    match code {
4713        UnmergedStatusCode::Added => proto::GitStatus::Added as _,
4714        UnmergedStatusCode::Deleted => proto::GitStatus::Deleted as _,
4715        UnmergedStatusCode::Updated => proto::GitStatus::Updated as _,
4716    }
4717}
4718
4719fn tracked_status_to_proto(code: StatusCode) -> i32 {
4720    match code {
4721        StatusCode::Added => proto::GitStatus::Added as _,
4722        StatusCode::Deleted => proto::GitStatus::Deleted as _,
4723        StatusCode::Modified => proto::GitStatus::Modified as _,
4724        StatusCode::Renamed => proto::GitStatus::Renamed as _,
4725        StatusCode::TypeChanged => proto::GitStatus::TypeChanged as _,
4726        StatusCode::Copied => proto::GitStatus::Copied as _,
4727        StatusCode::Unmodified => proto::GitStatus::Unmodified as _,
4728    }
4729}