1mod conflict_set;
2pub mod git_traversal;
3
4use crate::{
5 ProjectEnvironment, ProjectItem, ProjectPath,
6 buffer_store::{BufferStore, BufferStoreEvent},
7 worktree_store::{WorktreeStore, WorktreeStoreEvent},
8};
9use anyhow::{Context as _, Result, anyhow, bail};
10use askpass::AskPassDelegate;
11use buffer_diff::{BufferDiff, BufferDiffEvent};
12use client::ProjectId;
13use collections::HashMap;
14pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, ConflictSetUpdate};
15use fs::Fs;
16use futures::{
17 FutureExt, StreamExt as _,
18 channel::{mpsc, oneshot},
19 future::{self, Shared},
20};
21use git::{
22 BuildPermalinkParams, GitHostingProviderRegistry, WORK_DIRECTORY_REPO_PATH,
23 blame::Blame,
24 parse_git_remote_url,
25 repository::{
26 Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, DiffType, GitRepository,
27 GitRepositoryCheckpoint, PushOptions, Remote, RemoteCommandOutput, RepoPath, ResetMode,
28 UpstreamTrackingStatus,
29 },
30 status::{
31 FileStatus, GitSummary, StatusCode, TrackedStatus, UnmergedStatus, UnmergedStatusCode,
32 },
33};
34use gpui::{
35 App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
36 WeakEntity,
37};
38use language::{
39 Buffer, BufferEvent, Language, LanguageRegistry,
40 proto::{deserialize_version, serialize_version},
41};
42use parking_lot::Mutex;
43use postage::stream::Stream as _;
44use rpc::{
45 AnyProtoClient, TypedEnvelope,
46 proto::{self, FromProto, SSH_PROJECT_ID, ToProto, git_reset, split_repository_update},
47};
48use serde::Deserialize;
49use std::{
50 cmp::Ordering,
51 collections::{BTreeSet, VecDeque},
52 future::Future,
53 mem,
54 ops::Range,
55 path::{Path, PathBuf},
56 sync::{
57 Arc,
58 atomic::{self, AtomicU64},
59 },
60 time::Instant,
61};
62use sum_tree::{Edit, SumTree, TreeSet};
63use text::{Bias, BufferId};
64use util::{ResultExt, debug_panic, post_inc};
65use worktree::{
66 File, PathKey, PathProgress, PathSummary, PathTarget, UpdatedGitRepositoriesSet,
67 UpdatedGitRepository, Worktree,
68};
69
70pub struct GitStore {
71 state: GitStoreState,
72 buffer_store: Entity<BufferStore>,
73 worktree_store: Entity<WorktreeStore>,
74 repositories: HashMap<RepositoryId, Entity<Repository>>,
75 active_repo_id: Option<RepositoryId>,
76 #[allow(clippy::type_complexity)]
77 loading_diffs:
78 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
79 diffs: HashMap<BufferId, Entity<BufferGitState>>,
80 shared_diffs: HashMap<proto::PeerId, HashMap<BufferId, SharedDiffs>>,
81 _subscriptions: Vec<Subscription>,
82}
83
84#[derive(Default)]
85struct SharedDiffs {
86 unstaged: Option<Entity<BufferDiff>>,
87 uncommitted: Option<Entity<BufferDiff>>,
88}
89
90struct BufferGitState {
91 unstaged_diff: Option<WeakEntity<BufferDiff>>,
92 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
93 conflict_set: Option<WeakEntity<ConflictSet>>,
94 recalculate_diff_task: Option<Task<Result<()>>>,
95 reparse_conflict_markers_task: Option<Task<Result<()>>>,
96 language: Option<Arc<Language>>,
97 language_registry: Option<Arc<LanguageRegistry>>,
98 conflict_updated_futures: Vec<oneshot::Sender<()>>,
99 recalculating_tx: postage::watch::Sender<bool>,
100
101 /// These operation counts are used to ensure that head and index text
102 /// values read from the git repository are up-to-date with any hunk staging
103 /// operations that have been performed on the BufferDiff.
104 ///
105 /// The operation count is incremented immediately when the user initiates a
106 /// hunk stage/unstage operation. Then, upon finishing writing the new index
107 /// text do disk, the `operation count as of write` is updated to reflect
108 /// the operation count that prompted the write.
109 hunk_staging_operation_count: usize,
110 hunk_staging_operation_count_as_of_write: usize,
111
112 head_text: Option<Arc<String>>,
113 index_text: Option<Arc<String>>,
114 head_changed: bool,
115 index_changed: bool,
116 language_changed: bool,
117}
118
119#[derive(Clone, Debug)]
120enum DiffBasesChange {
121 SetIndex(Option<String>),
122 SetHead(Option<String>),
123 SetEach {
124 index: Option<String>,
125 head: Option<String>,
126 },
127 SetBoth(Option<String>),
128}
129
130#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
131enum DiffKind {
132 Unstaged,
133 Uncommitted,
134}
135
136enum GitStoreState {
137 Local {
138 next_repository_id: Arc<AtomicU64>,
139 downstream: Option<LocalDownstreamState>,
140 project_environment: Entity<ProjectEnvironment>,
141 fs: Arc<dyn Fs>,
142 },
143 Ssh {
144 upstream_client: AnyProtoClient,
145 upstream_project_id: ProjectId,
146 downstream: Option<(AnyProtoClient, ProjectId)>,
147 },
148 Remote {
149 upstream_client: AnyProtoClient,
150 upstream_project_id: ProjectId,
151 },
152}
153
154enum DownstreamUpdate {
155 UpdateRepository(RepositorySnapshot),
156 RemoveRepository(RepositoryId),
157}
158
159struct LocalDownstreamState {
160 client: AnyProtoClient,
161 project_id: ProjectId,
162 updates_tx: mpsc::UnboundedSender<DownstreamUpdate>,
163 _task: Task<Result<()>>,
164}
165
166#[derive(Clone, Debug)]
167pub struct GitStoreCheckpoint {
168 checkpoints_by_work_dir_abs_path: HashMap<Arc<Path>, GitRepositoryCheckpoint>,
169}
170
171#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct StatusEntry {
173 pub repo_path: RepoPath,
174 pub status: FileStatus,
175}
176
177impl StatusEntry {
178 fn to_proto(&self) -> proto::StatusEntry {
179 let simple_status = match self.status {
180 FileStatus::Ignored | FileStatus::Untracked => proto::GitStatus::Added as i32,
181 FileStatus::Unmerged { .. } => proto::GitStatus::Conflict as i32,
182 FileStatus::Tracked(TrackedStatus {
183 index_status,
184 worktree_status,
185 }) => tracked_status_to_proto(if worktree_status != StatusCode::Unmodified {
186 worktree_status
187 } else {
188 index_status
189 }),
190 };
191
192 proto::StatusEntry {
193 repo_path: self.repo_path.as_ref().to_proto(),
194 simple_status,
195 status: Some(status_to_proto(self.status)),
196 }
197 }
198}
199
200impl TryFrom<proto::StatusEntry> for StatusEntry {
201 type Error = anyhow::Error;
202
203 fn try_from(value: proto::StatusEntry) -> Result<Self, Self::Error> {
204 let repo_path = RepoPath(Arc::<Path>::from_proto(value.repo_path));
205 let status = status_from_proto(value.simple_status, value.status)?;
206 Ok(Self { repo_path, status })
207 }
208}
209
210impl sum_tree::Item for StatusEntry {
211 type Summary = PathSummary<GitSummary>;
212
213 fn summary(&self, _: &<Self::Summary as sum_tree::Summary>::Context) -> Self::Summary {
214 PathSummary {
215 max_path: self.repo_path.0.clone(),
216 item_summary: self.status.summary(),
217 }
218 }
219}
220
221impl sum_tree::KeyedItem for StatusEntry {
222 type Key = PathKey;
223
224 fn key(&self) -> Self::Key {
225 PathKey(self.repo_path.0.clone())
226 }
227}
228
229#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
230pub struct RepositoryId(pub u64);
231
232#[derive(Clone, Debug, Default, PartialEq, Eq)]
233pub struct MergeDetails {
234 pub conflicted_paths: TreeSet<RepoPath>,
235 pub message: Option<SharedString>,
236 pub heads: Vec<Option<SharedString>>,
237}
238
239#[derive(Clone, Debug, PartialEq, Eq)]
240pub struct RepositorySnapshot {
241 pub id: RepositoryId,
242 pub statuses_by_path: SumTree<StatusEntry>,
243 pub work_directory_abs_path: Arc<Path>,
244 pub branch: Option<Branch>,
245 pub head_commit: Option<CommitDetails>,
246 pub scan_id: u64,
247 pub merge: MergeDetails,
248}
249
250type JobId = u64;
251
252#[derive(Clone, Debug, PartialEq, Eq)]
253pub struct JobInfo {
254 pub start: Instant,
255 pub message: SharedString,
256}
257
258pub struct Repository {
259 this: WeakEntity<Self>,
260 snapshot: RepositorySnapshot,
261 commit_message_buffer: Option<Entity<Buffer>>,
262 git_store: WeakEntity<GitStore>,
263 // For a local repository, holds paths that have had worktree events since the last status scan completed,
264 // and that should be examined during the next status scan.
265 paths_needing_status_update: BTreeSet<RepoPath>,
266 job_sender: mpsc::UnboundedSender<GitJob>,
267 active_jobs: HashMap<JobId, JobInfo>,
268 job_id: JobId,
269 askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
270 latest_askpass_id: u64,
271}
272
273impl std::ops::Deref for Repository {
274 type Target = RepositorySnapshot;
275
276 fn deref(&self) -> &Self::Target {
277 &self.snapshot
278 }
279}
280
281#[derive(Clone)]
282pub enum RepositoryState {
283 Local {
284 backend: Arc<dyn GitRepository>,
285 environment: Arc<HashMap<String, String>>,
286 },
287 Remote {
288 project_id: ProjectId,
289 client: AnyProtoClient,
290 },
291}
292
293#[derive(Clone, Debug)]
294pub enum RepositoryEvent {
295 Updated { full_scan: bool },
296 MergeHeadsChanged,
297}
298
299#[derive(Clone, Debug)]
300pub struct JobsUpdated;
301
302#[derive(Debug)]
303pub enum GitStoreEvent {
304 ActiveRepositoryChanged(Option<RepositoryId>),
305 RepositoryUpdated(RepositoryId, RepositoryEvent, bool),
306 RepositoryAdded(RepositoryId),
307 RepositoryRemoved(RepositoryId),
308 IndexWriteError(anyhow::Error),
309 JobsUpdated,
310 ConflictsUpdated,
311}
312
313impl EventEmitter<RepositoryEvent> for Repository {}
314impl EventEmitter<JobsUpdated> for Repository {}
315impl EventEmitter<GitStoreEvent> for GitStore {}
316
317pub struct GitJob {
318 job: Box<dyn FnOnce(RepositoryState, &mut AsyncApp) -> Task<()>>,
319 key: Option<GitJobKey>,
320}
321
322#[derive(PartialEq, Eq)]
323enum GitJobKey {
324 WriteIndex(RepoPath),
325 ReloadBufferDiffBases,
326 RefreshStatuses,
327 ReloadGitState,
328}
329
330impl GitStore {
331 pub fn local(
332 worktree_store: &Entity<WorktreeStore>,
333 buffer_store: Entity<BufferStore>,
334 environment: Entity<ProjectEnvironment>,
335 fs: Arc<dyn Fs>,
336 cx: &mut Context<Self>,
337 ) -> Self {
338 Self::new(
339 worktree_store.clone(),
340 buffer_store,
341 GitStoreState::Local {
342 next_repository_id: Arc::new(AtomicU64::new(1)),
343 downstream: None,
344 project_environment: environment,
345 fs,
346 },
347 cx,
348 )
349 }
350
351 pub fn remote(
352 worktree_store: &Entity<WorktreeStore>,
353 buffer_store: Entity<BufferStore>,
354 upstream_client: AnyProtoClient,
355 project_id: ProjectId,
356 cx: &mut Context<Self>,
357 ) -> Self {
358 Self::new(
359 worktree_store.clone(),
360 buffer_store,
361 GitStoreState::Remote {
362 upstream_client,
363 upstream_project_id: project_id,
364 },
365 cx,
366 )
367 }
368
369 pub fn ssh(
370 worktree_store: &Entity<WorktreeStore>,
371 buffer_store: Entity<BufferStore>,
372 upstream_client: AnyProtoClient,
373 cx: &mut Context<Self>,
374 ) -> Self {
375 Self::new(
376 worktree_store.clone(),
377 buffer_store,
378 GitStoreState::Ssh {
379 upstream_client,
380 upstream_project_id: ProjectId(SSH_PROJECT_ID),
381 downstream: None,
382 },
383 cx,
384 )
385 }
386
387 fn new(
388 worktree_store: Entity<WorktreeStore>,
389 buffer_store: Entity<BufferStore>,
390 state: GitStoreState,
391 cx: &mut Context<Self>,
392 ) -> Self {
393 let _subscriptions = vec![
394 cx.subscribe(&worktree_store, Self::on_worktree_store_event),
395 cx.subscribe(&buffer_store, Self::on_buffer_store_event),
396 ];
397
398 GitStore {
399 state,
400 buffer_store,
401 worktree_store,
402 repositories: HashMap::default(),
403 active_repo_id: None,
404 _subscriptions,
405 loading_diffs: HashMap::default(),
406 shared_diffs: HashMap::default(),
407 diffs: HashMap::default(),
408 }
409 }
410
411 pub fn init(client: &AnyProtoClient) {
412 client.add_entity_request_handler(Self::handle_get_remotes);
413 client.add_entity_request_handler(Self::handle_get_branches);
414 client.add_entity_request_handler(Self::handle_change_branch);
415 client.add_entity_request_handler(Self::handle_create_branch);
416 client.add_entity_request_handler(Self::handle_git_init);
417 client.add_entity_request_handler(Self::handle_push);
418 client.add_entity_request_handler(Self::handle_pull);
419 client.add_entity_request_handler(Self::handle_fetch);
420 client.add_entity_request_handler(Self::handle_stage);
421 client.add_entity_request_handler(Self::handle_unstage);
422 client.add_entity_request_handler(Self::handle_commit);
423 client.add_entity_request_handler(Self::handle_reset);
424 client.add_entity_request_handler(Self::handle_show);
425 client.add_entity_request_handler(Self::handle_load_commit_diff);
426 client.add_entity_request_handler(Self::handle_checkout_files);
427 client.add_entity_request_handler(Self::handle_open_commit_message_buffer);
428 client.add_entity_request_handler(Self::handle_set_index_text);
429 client.add_entity_request_handler(Self::handle_askpass);
430 client.add_entity_request_handler(Self::handle_check_for_pushed_commits);
431 client.add_entity_request_handler(Self::handle_git_diff);
432 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
433 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
434 client.add_entity_message_handler(Self::handle_update_diff_bases);
435 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
436 client.add_entity_request_handler(Self::handle_blame_buffer);
437 client.add_entity_message_handler(Self::handle_update_repository);
438 client.add_entity_message_handler(Self::handle_remove_repository);
439 }
440
441 pub fn is_local(&self) -> bool {
442 matches!(self.state, GitStoreState::Local { .. })
443 }
444
445 pub fn shared(&mut self, project_id: u64, client: AnyProtoClient, cx: &mut Context<Self>) {
446 match &mut self.state {
447 GitStoreState::Ssh {
448 downstream: downstream_client,
449 ..
450 } => {
451 for repo in self.repositories.values() {
452 let update = repo.read(cx).snapshot.initial_update(project_id);
453 for update in split_repository_update(update) {
454 client.send(update).log_err();
455 }
456 }
457 *downstream_client = Some((client, ProjectId(project_id)));
458 }
459 GitStoreState::Local {
460 downstream: downstream_client,
461 ..
462 } => {
463 let mut snapshots = HashMap::default();
464 let (updates_tx, mut updates_rx) = mpsc::unbounded();
465 for repo in self.repositories.values() {
466 updates_tx
467 .unbounded_send(DownstreamUpdate::UpdateRepository(
468 repo.read(cx).snapshot.clone(),
469 ))
470 .ok();
471 }
472 *downstream_client = Some(LocalDownstreamState {
473 client: client.clone(),
474 project_id: ProjectId(project_id),
475 updates_tx,
476 _task: cx.spawn(async move |this, cx| {
477 cx.background_spawn(async move {
478 while let Some(update) = updates_rx.next().await {
479 match update {
480 DownstreamUpdate::UpdateRepository(snapshot) => {
481 if let Some(old_snapshot) = snapshots.get_mut(&snapshot.id)
482 {
483 let update =
484 snapshot.build_update(old_snapshot, project_id);
485 *old_snapshot = snapshot;
486 for update in split_repository_update(update) {
487 client.send(update)?;
488 }
489 } else {
490 let update = snapshot.initial_update(project_id);
491 for update in split_repository_update(update) {
492 client.send(update)?;
493 }
494 snapshots.insert(snapshot.id, snapshot);
495 }
496 }
497 DownstreamUpdate::RemoveRepository(id) => {
498 client.send(proto::RemoveRepository {
499 project_id,
500 id: id.to_proto(),
501 })?;
502 }
503 }
504 }
505 anyhow::Ok(())
506 })
507 .await
508 .ok();
509 this.update(cx, |this, _| {
510 if let GitStoreState::Local {
511 downstream: downstream_client,
512 ..
513 } = &mut this.state
514 {
515 downstream_client.take();
516 } else {
517 unreachable!("unshared called on remote store");
518 }
519 })
520 }),
521 });
522 }
523 GitStoreState::Remote { .. } => {
524 debug_panic!("shared called on remote store");
525 }
526 }
527 }
528
529 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
530 match &mut self.state {
531 GitStoreState::Local {
532 downstream: downstream_client,
533 ..
534 } => {
535 downstream_client.take();
536 }
537 GitStoreState::Ssh {
538 downstream: downstream_client,
539 ..
540 } => {
541 downstream_client.take();
542 }
543 GitStoreState::Remote { .. } => {
544 debug_panic!("unshared called on remote store");
545 }
546 }
547 self.shared_diffs.clear();
548 }
549
550 pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) {
551 self.shared_diffs.remove(peer_id);
552 }
553
554 pub fn active_repository(&self) -> Option<Entity<Repository>> {
555 self.active_repo_id
556 .as_ref()
557 .map(|id| self.repositories[&id].clone())
558 }
559
560 pub fn open_unstaged_diff(
561 &mut self,
562 buffer: Entity<Buffer>,
563 cx: &mut Context<Self>,
564 ) -> Task<Result<Entity<BufferDiff>>> {
565 let buffer_id = buffer.read(cx).remote_id();
566 if let Some(diff_state) = self.diffs.get(&buffer_id) {
567 if let Some(unstaged_diff) = diff_state
568 .read(cx)
569 .unstaged_diff
570 .as_ref()
571 .and_then(|weak| weak.upgrade())
572 {
573 if let Some(task) =
574 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
575 {
576 return cx.background_executor().spawn(async move {
577 task.await;
578 Ok(unstaged_diff)
579 });
580 }
581 return Task::ready(Ok(unstaged_diff));
582 }
583 }
584
585 let Some((repo, repo_path)) =
586 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
587 else {
588 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
589 };
590
591 let task = self
592 .loading_diffs
593 .entry((buffer_id, DiffKind::Unstaged))
594 .or_insert_with(|| {
595 let staged_text = repo.update(cx, |repo, cx| {
596 repo.load_staged_text(buffer_id, repo_path, cx)
597 });
598 cx.spawn(async move |this, cx| {
599 Self::open_diff_internal(
600 this,
601 DiffKind::Unstaged,
602 staged_text.await.map(DiffBasesChange::SetIndex),
603 buffer,
604 cx,
605 )
606 .await
607 .map_err(Arc::new)
608 })
609 .shared()
610 })
611 .clone();
612
613 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
614 }
615
616 pub fn open_uncommitted_diff(
617 &mut self,
618 buffer: Entity<Buffer>,
619 cx: &mut Context<Self>,
620 ) -> Task<Result<Entity<BufferDiff>>> {
621 let buffer_id = buffer.read(cx).remote_id();
622
623 if let Some(diff_state) = self.diffs.get(&buffer_id) {
624 if let Some(uncommitted_diff) = diff_state
625 .read(cx)
626 .uncommitted_diff
627 .as_ref()
628 .and_then(|weak| weak.upgrade())
629 {
630 if let Some(task) =
631 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
632 {
633 return cx.background_executor().spawn(async move {
634 task.await;
635 Ok(uncommitted_diff)
636 });
637 }
638 return Task::ready(Ok(uncommitted_diff));
639 }
640 }
641
642 let Some((repo, repo_path)) =
643 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
644 else {
645 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
646 };
647
648 let task = self
649 .loading_diffs
650 .entry((buffer_id, DiffKind::Uncommitted))
651 .or_insert_with(|| {
652 let changes = repo.update(cx, |repo, cx| {
653 repo.load_committed_text(buffer_id, repo_path, cx)
654 });
655
656 cx.spawn(async move |this, cx| {
657 Self::open_diff_internal(this, DiffKind::Uncommitted, changes.await, buffer, cx)
658 .await
659 .map_err(Arc::new)
660 })
661 .shared()
662 })
663 .clone();
664
665 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
666 }
667
668 async fn open_diff_internal(
669 this: WeakEntity<Self>,
670 kind: DiffKind,
671 texts: Result<DiffBasesChange>,
672 buffer_entity: Entity<Buffer>,
673 cx: &mut AsyncApp,
674 ) -> Result<Entity<BufferDiff>> {
675 let diff_bases_change = match texts {
676 Err(e) => {
677 this.update(cx, |this, cx| {
678 let buffer = buffer_entity.read(cx);
679 let buffer_id = buffer.remote_id();
680 this.loading_diffs.remove(&(buffer_id, kind));
681 })?;
682 return Err(e);
683 }
684 Ok(change) => change,
685 };
686
687 this.update(cx, |this, cx| {
688 let buffer = buffer_entity.read(cx);
689 let buffer_id = buffer.remote_id();
690 let language = buffer.language().cloned();
691 let language_registry = buffer.language_registry();
692 let text_snapshot = buffer.text_snapshot();
693 this.loading_diffs.remove(&(buffer_id, kind));
694
695 let git_store = cx.weak_entity();
696 let diff_state = this
697 .diffs
698 .entry(buffer_id)
699 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
700
701 let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
702
703 cx.subscribe(&diff, Self::on_buffer_diff_event).detach();
704 diff_state.update(cx, |diff_state, cx| {
705 diff_state.language = language;
706 diff_state.language_registry = language_registry;
707
708 match kind {
709 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
710 DiffKind::Uncommitted => {
711 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
712 diff
713 } else {
714 let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
715 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
716 unstaged_diff
717 };
718
719 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
720 diff_state.uncommitted_diff = Some(diff.downgrade())
721 }
722 }
723
724 diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
725 let rx = diff_state.wait_for_recalculation();
726
727 anyhow::Ok(async move {
728 if let Some(rx) = rx {
729 rx.await;
730 }
731 Ok(diff)
732 })
733 })
734 })??
735 .await
736 }
737
738 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
739 let diff_state = self.diffs.get(&buffer_id)?;
740 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
741 }
742
743 pub fn get_uncommitted_diff(
744 &self,
745 buffer_id: BufferId,
746 cx: &App,
747 ) -> Option<Entity<BufferDiff>> {
748 let diff_state = self.diffs.get(&buffer_id)?;
749 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
750 }
751
752 pub fn open_conflict_set(
753 &mut self,
754 buffer: Entity<Buffer>,
755 cx: &mut Context<Self>,
756 ) -> Entity<ConflictSet> {
757 log::debug!("open conflict set");
758 let buffer_id = buffer.read(cx).remote_id();
759
760 if let Some(git_state) = self.diffs.get(&buffer_id) {
761 if let Some(conflict_set) = git_state
762 .read(cx)
763 .conflict_set
764 .as_ref()
765 .and_then(|weak| weak.upgrade())
766 {
767 let conflict_set = conflict_set.clone();
768 let buffer_snapshot = buffer.read(cx).text_snapshot();
769
770 git_state.update(cx, |state, cx| {
771 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
772 });
773
774 return conflict_set;
775 }
776 }
777
778 let is_unmerged = self
779 .repository_and_path_for_buffer_id(buffer_id, cx)
780 .map_or(false, |(repo, path)| {
781 repo.read(cx)
782 .snapshot
783 .merge
784 .conflicted_paths
785 .contains(&path)
786 });
787 let git_store = cx.weak_entity();
788 let buffer_git_state = self
789 .diffs
790 .entry(buffer_id)
791 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
792 let conflict_set = cx.new(|cx| ConflictSet::new(buffer_id, is_unmerged, cx));
793
794 self._subscriptions
795 .push(cx.subscribe(&conflict_set, |_, _, _, cx| {
796 cx.emit(GitStoreEvent::ConflictsUpdated);
797 }));
798
799 buffer_git_state.update(cx, |state, cx| {
800 state.conflict_set = Some(conflict_set.downgrade());
801 let buffer_snapshot = buffer.read(cx).text_snapshot();
802 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
803 });
804
805 conflict_set
806 }
807
808 pub fn project_path_git_status(
809 &self,
810 project_path: &ProjectPath,
811 cx: &App,
812 ) -> Option<FileStatus> {
813 let (repo, repo_path) = self.repository_and_path_for_project_path(project_path, cx)?;
814 Some(repo.read(cx).status_for_path(&repo_path)?.status)
815 }
816
817 pub fn checkpoint(&self, cx: &mut App) -> Task<Result<GitStoreCheckpoint>> {
818 let mut work_directory_abs_paths = Vec::new();
819 let mut checkpoints = Vec::new();
820 for repository in self.repositories.values() {
821 repository.update(cx, |repository, _| {
822 work_directory_abs_paths.push(repository.snapshot.work_directory_abs_path.clone());
823 checkpoints.push(repository.checkpoint().map(|checkpoint| checkpoint?));
824 });
825 }
826
827 cx.background_executor().spawn(async move {
828 let checkpoints = future::try_join_all(checkpoints).await?;
829 Ok(GitStoreCheckpoint {
830 checkpoints_by_work_dir_abs_path: work_directory_abs_paths
831 .into_iter()
832 .zip(checkpoints)
833 .collect(),
834 })
835 })
836 }
837
838 pub fn restore_checkpoint(
839 &self,
840 checkpoint: GitStoreCheckpoint,
841 cx: &mut App,
842 ) -> Task<Result<()>> {
843 let repositories_by_work_dir_abs_path = self
844 .repositories
845 .values()
846 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
847 .collect::<HashMap<_, _>>();
848
849 let mut tasks = Vec::new();
850 for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
851 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path) {
852 let restore = repository.update(cx, |repository, _| {
853 repository.restore_checkpoint(checkpoint)
854 });
855 tasks.push(async move { restore.await? });
856 }
857 }
858 cx.background_spawn(async move {
859 future::try_join_all(tasks).await?;
860 Ok(())
861 })
862 }
863
864 /// Compares two checkpoints, returning true if they are equal.
865 pub fn compare_checkpoints(
866 &self,
867 left: GitStoreCheckpoint,
868 mut right: GitStoreCheckpoint,
869 cx: &mut App,
870 ) -> Task<Result<bool>> {
871 let repositories_by_work_dir_abs_path = self
872 .repositories
873 .values()
874 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
875 .collect::<HashMap<_, _>>();
876
877 let mut tasks = Vec::new();
878 for (work_dir_abs_path, left_checkpoint) in left.checkpoints_by_work_dir_abs_path {
879 if let Some(right_checkpoint) = right
880 .checkpoints_by_work_dir_abs_path
881 .remove(&work_dir_abs_path)
882 {
883 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path)
884 {
885 let compare = repository.update(cx, |repository, _| {
886 repository.compare_checkpoints(left_checkpoint, right_checkpoint)
887 });
888
889 tasks.push(async move { compare.await? });
890 }
891 } else {
892 return Task::ready(Ok(false));
893 }
894 }
895 cx.background_spawn(async move {
896 Ok(future::try_join_all(tasks)
897 .await?
898 .into_iter()
899 .all(|result| result))
900 })
901 }
902
903 /// Blames a buffer.
904 pub fn blame_buffer(
905 &self,
906 buffer: &Entity<Buffer>,
907 version: Option<clock::Global>,
908 cx: &mut App,
909 ) -> Task<Result<Option<Blame>>> {
910 let buffer = buffer.read(cx);
911 let Some((repo, repo_path)) =
912 self.repository_and_path_for_buffer_id(buffer.remote_id(), cx)
913 else {
914 return Task::ready(Err(anyhow!("failed to find a git repository for buffer")));
915 };
916 let content = match &version {
917 Some(version) => buffer.rope_for_version(version).clone(),
918 None => buffer.as_rope().clone(),
919 };
920 let version = version.unwrap_or(buffer.version());
921 let buffer_id = buffer.remote_id();
922
923 let rx = repo.update(cx, |repo, _| {
924 repo.send_job(None, move |state, _| async move {
925 match state {
926 RepositoryState::Local { backend, .. } => backend
927 .blame(repo_path.clone(), content)
928 .await
929 .with_context(|| format!("Failed to blame {:?}", repo_path.0))
930 .map(Some),
931 RepositoryState::Remote { project_id, client } => {
932 let response = client
933 .request(proto::BlameBuffer {
934 project_id: project_id.to_proto(),
935 buffer_id: buffer_id.into(),
936 version: serialize_version(&version),
937 })
938 .await?;
939 Ok(deserialize_blame_buffer_response(response))
940 }
941 }
942 })
943 });
944
945 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
946 }
947
948 pub fn get_permalink_to_line(
949 &self,
950 buffer: &Entity<Buffer>,
951 selection: Range<u32>,
952 cx: &mut App,
953 ) -> Task<Result<url::Url>> {
954 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
955 return Task::ready(Err(anyhow!("buffer has no file")));
956 };
957
958 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
959 &(file.worktree.read(cx).id(), file.path.clone()).into(),
960 cx,
961 ) else {
962 // If we're not in a Git repo, check whether this is a Rust source
963 // file in the Cargo registry (presumably opened with go-to-definition
964 // from a normal Rust file). If so, we can put together a permalink
965 // using crate metadata.
966 if buffer
967 .read(cx)
968 .language()
969 .is_none_or(|lang| lang.name() != "Rust".into())
970 {
971 return Task::ready(Err(anyhow!("no permalink available")));
972 }
973 let Some(file_path) = file.worktree.read(cx).absolutize(&file.path).ok() else {
974 return Task::ready(Err(anyhow!("no permalink available")));
975 };
976 return cx.spawn(async move |cx| {
977 let provider_registry = cx.update(GitHostingProviderRegistry::default_global)?;
978 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
979 .context("no permalink available")
980 });
981
982 // TODO remote case
983 };
984
985 let buffer_id = buffer.read(cx).remote_id();
986 let branch = repo.read(cx).branch.clone();
987 let remote = branch
988 .as_ref()
989 .and_then(|b| b.upstream.as_ref())
990 .and_then(|b| b.remote_name())
991 .unwrap_or("origin")
992 .to_string();
993
994 let rx = repo.update(cx, |repo, _| {
995 repo.send_job(None, move |state, cx| async move {
996 match state {
997 RepositoryState::Local { backend, .. } => {
998 let origin_url = backend
999 .remote_url(&remote)
1000 .with_context(|| format!("remote \"{remote}\" not found"))?;
1001
1002 let sha = backend.head_sha().await.context("reading HEAD SHA")?;
1003
1004 let provider_registry =
1005 cx.update(GitHostingProviderRegistry::default_global)?;
1006
1007 let (provider, remote) =
1008 parse_git_remote_url(provider_registry, &origin_url)
1009 .context("parsing Git remote URL")?;
1010
1011 let path = repo_path.to_str().with_context(|| {
1012 format!("converting repo path {repo_path:?} to string")
1013 })?;
1014
1015 Ok(provider.build_permalink(
1016 remote,
1017 BuildPermalinkParams {
1018 sha: &sha,
1019 path,
1020 selection: Some(selection),
1021 },
1022 ))
1023 }
1024 RepositoryState::Remote { project_id, client } => {
1025 let response = client
1026 .request(proto::GetPermalinkToLine {
1027 project_id: project_id.to_proto(),
1028 buffer_id: buffer_id.into(),
1029 selection: Some(proto::Range {
1030 start: selection.start as u64,
1031 end: selection.end as u64,
1032 }),
1033 })
1034 .await?;
1035
1036 url::Url::parse(&response.permalink).context("failed to parse permalink")
1037 }
1038 }
1039 })
1040 });
1041 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1042 }
1043
1044 fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> {
1045 match &self.state {
1046 GitStoreState::Local {
1047 downstream: downstream_client,
1048 ..
1049 } => downstream_client
1050 .as_ref()
1051 .map(|state| (state.client.clone(), state.project_id)),
1052 GitStoreState::Ssh {
1053 downstream: downstream_client,
1054 ..
1055 } => downstream_client.clone(),
1056 GitStoreState::Remote { .. } => None,
1057 }
1058 }
1059
1060 fn upstream_client(&self) -> Option<AnyProtoClient> {
1061 match &self.state {
1062 GitStoreState::Local { .. } => None,
1063 GitStoreState::Ssh {
1064 upstream_client, ..
1065 }
1066 | GitStoreState::Remote {
1067 upstream_client, ..
1068 } => Some(upstream_client.clone()),
1069 }
1070 }
1071
1072 fn on_worktree_store_event(
1073 &mut self,
1074 worktree_store: Entity<WorktreeStore>,
1075 event: &WorktreeStoreEvent,
1076 cx: &mut Context<Self>,
1077 ) {
1078 let GitStoreState::Local {
1079 project_environment,
1080 downstream,
1081 next_repository_id,
1082 fs,
1083 } = &self.state
1084 else {
1085 return;
1086 };
1087
1088 match event {
1089 WorktreeStoreEvent::WorktreeUpdatedEntries(worktree_id, updated_entries) => {
1090 let mut paths_by_git_repo = HashMap::<_, Vec<_>>::default();
1091 for (relative_path, _, _) in updated_entries.iter() {
1092 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
1093 &(*worktree_id, relative_path.clone()).into(),
1094 cx,
1095 ) else {
1096 continue;
1097 };
1098 paths_by_git_repo.entry(repo).or_default().push(repo_path)
1099 }
1100
1101 for (repo, paths) in paths_by_git_repo {
1102 repo.update(cx, |repo, cx| {
1103 repo.paths_changed(
1104 paths,
1105 downstream
1106 .as_ref()
1107 .map(|downstream| downstream.updates_tx.clone()),
1108 cx,
1109 );
1110 });
1111 }
1112 }
1113 WorktreeStoreEvent::WorktreeUpdatedGitRepositories(worktree_id, changed_repos) => {
1114 let Some(worktree) = worktree_store.read(cx).worktree_for_id(*worktree_id, cx)
1115 else {
1116 return;
1117 };
1118 if !worktree.read(cx).is_visible() {
1119 log::debug!(
1120 "not adding repositories for local worktree {:?} because it's not visible",
1121 worktree.read(cx).abs_path()
1122 );
1123 return;
1124 }
1125 self.update_repositories_from_worktree(
1126 project_environment.clone(),
1127 next_repository_id.clone(),
1128 downstream
1129 .as_ref()
1130 .map(|downstream| downstream.updates_tx.clone()),
1131 changed_repos.clone(),
1132 fs.clone(),
1133 cx,
1134 );
1135 self.local_worktree_git_repos_changed(worktree, changed_repos, cx);
1136 }
1137 _ => {}
1138 }
1139 }
1140
1141 fn on_repository_event(
1142 &mut self,
1143 repo: Entity<Repository>,
1144 event: &RepositoryEvent,
1145 cx: &mut Context<Self>,
1146 ) {
1147 let id = repo.read(cx).id;
1148 let merge_conflicts = repo.read(cx).snapshot.merge.conflicted_paths.clone();
1149 for (buffer_id, diff) in self.diffs.iter() {
1150 if let Some((buffer_repo, repo_path)) =
1151 self.repository_and_path_for_buffer_id(*buffer_id, cx)
1152 {
1153 if buffer_repo == repo {
1154 diff.update(cx, |diff, cx| {
1155 if let Some(conflict_set) = &diff.conflict_set {
1156 let conflict_status_changed =
1157 conflict_set.update(cx, |conflict_set, cx| {
1158 let has_conflict = merge_conflicts.contains(&repo_path);
1159 conflict_set.set_has_conflict(has_conflict, cx)
1160 })?;
1161 if conflict_status_changed {
1162 let buffer_store = self.buffer_store.read(cx);
1163 if let Some(buffer) = buffer_store.get(*buffer_id) {
1164 let _ = diff.reparse_conflict_markers(
1165 buffer.read(cx).text_snapshot(),
1166 cx,
1167 );
1168 }
1169 }
1170 }
1171 anyhow::Ok(())
1172 })
1173 .ok();
1174 }
1175 }
1176 }
1177 cx.emit(GitStoreEvent::RepositoryUpdated(
1178 id,
1179 event.clone(),
1180 self.active_repo_id == Some(id),
1181 ))
1182 }
1183
1184 fn on_jobs_updated(&mut self, _: Entity<Repository>, _: &JobsUpdated, cx: &mut Context<Self>) {
1185 cx.emit(GitStoreEvent::JobsUpdated)
1186 }
1187
1188 /// Update our list of repositories and schedule git scans in response to a notification from a worktree,
1189 fn update_repositories_from_worktree(
1190 &mut self,
1191 project_environment: Entity<ProjectEnvironment>,
1192 next_repository_id: Arc<AtomicU64>,
1193 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
1194 updated_git_repositories: UpdatedGitRepositoriesSet,
1195 fs: Arc<dyn Fs>,
1196 cx: &mut Context<Self>,
1197 ) {
1198 let mut removed_ids = Vec::new();
1199 for update in updated_git_repositories.iter() {
1200 if let Some((id, existing)) = self.repositories.iter().find(|(_, repo)| {
1201 let existing_work_directory_abs_path =
1202 repo.read(cx).work_directory_abs_path.clone();
1203 Some(&existing_work_directory_abs_path)
1204 == update.old_work_directory_abs_path.as_ref()
1205 || Some(&existing_work_directory_abs_path)
1206 == update.new_work_directory_abs_path.as_ref()
1207 }) {
1208 if let Some(new_work_directory_abs_path) =
1209 update.new_work_directory_abs_path.clone()
1210 {
1211 existing.update(cx, |existing, cx| {
1212 existing.snapshot.work_directory_abs_path = new_work_directory_abs_path;
1213 existing.schedule_scan(updates_tx.clone(), cx);
1214 });
1215 } else {
1216 removed_ids.push(*id);
1217 }
1218 } else if let UpdatedGitRepository {
1219 new_work_directory_abs_path: Some(work_directory_abs_path),
1220 dot_git_abs_path: Some(dot_git_abs_path),
1221 repository_dir_abs_path: Some(repository_dir_abs_path),
1222 common_dir_abs_path: Some(common_dir_abs_path),
1223 ..
1224 } = update
1225 {
1226 let id = RepositoryId(next_repository_id.fetch_add(1, atomic::Ordering::Release));
1227 let git_store = cx.weak_entity();
1228 let repo = cx.new(|cx| {
1229 let mut repo = Repository::local(
1230 id,
1231 work_directory_abs_path.clone(),
1232 dot_git_abs_path.clone(),
1233 repository_dir_abs_path.clone(),
1234 common_dir_abs_path.clone(),
1235 project_environment.downgrade(),
1236 fs.clone(),
1237 git_store,
1238 cx,
1239 );
1240 repo.schedule_scan(updates_tx.clone(), cx);
1241 repo
1242 });
1243 self._subscriptions
1244 .push(cx.subscribe(&repo, Self::on_repository_event));
1245 self._subscriptions
1246 .push(cx.subscribe(&repo, Self::on_jobs_updated));
1247 self.repositories.insert(id, repo);
1248 cx.emit(GitStoreEvent::RepositoryAdded(id));
1249 self.active_repo_id.get_or_insert_with(|| {
1250 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1251 id
1252 });
1253 }
1254 }
1255
1256 for id in removed_ids {
1257 if self.active_repo_id == Some(id) {
1258 self.active_repo_id = None;
1259 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1260 }
1261 self.repositories.remove(&id);
1262 if let Some(updates_tx) = updates_tx.as_ref() {
1263 updates_tx
1264 .unbounded_send(DownstreamUpdate::RemoveRepository(id))
1265 .ok();
1266 }
1267 }
1268 }
1269
1270 fn on_buffer_store_event(
1271 &mut self,
1272 _: Entity<BufferStore>,
1273 event: &BufferStoreEvent,
1274 cx: &mut Context<Self>,
1275 ) {
1276 match event {
1277 BufferStoreEvent::BufferAdded(buffer) => {
1278 cx.subscribe(&buffer, |this, buffer, event, cx| {
1279 if let BufferEvent::LanguageChanged = event {
1280 let buffer_id = buffer.read(cx).remote_id();
1281 if let Some(diff_state) = this.diffs.get(&buffer_id) {
1282 diff_state.update(cx, |diff_state, cx| {
1283 diff_state.buffer_language_changed(buffer, cx);
1284 });
1285 }
1286 }
1287 })
1288 .detach();
1289 }
1290 BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => {
1291 if let Some(diffs) = self.shared_diffs.get_mut(peer_id) {
1292 diffs.remove(buffer_id);
1293 }
1294 }
1295 BufferStoreEvent::BufferDropped(buffer_id) => {
1296 self.diffs.remove(&buffer_id);
1297 for diffs in self.shared_diffs.values_mut() {
1298 diffs.remove(buffer_id);
1299 }
1300 }
1301
1302 _ => {}
1303 }
1304 }
1305
1306 pub fn recalculate_buffer_diffs(
1307 &mut self,
1308 buffers: Vec<Entity<Buffer>>,
1309 cx: &mut Context<Self>,
1310 ) -> impl Future<Output = ()> + use<> {
1311 let mut futures = Vec::new();
1312 for buffer in buffers {
1313 if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
1314 let buffer = buffer.read(cx).text_snapshot();
1315 diff_state.update(cx, |diff_state, cx| {
1316 diff_state.recalculate_diffs(buffer.clone(), cx);
1317 futures.extend(diff_state.wait_for_recalculation().map(FutureExt::boxed));
1318 });
1319 futures.push(diff_state.update(cx, |diff_state, cx| {
1320 diff_state
1321 .reparse_conflict_markers(buffer, cx)
1322 .map(|_| {})
1323 .boxed()
1324 }));
1325 }
1326 }
1327 async move {
1328 futures::future::join_all(futures).await;
1329 }
1330 }
1331
1332 fn on_buffer_diff_event(
1333 &mut self,
1334 diff: Entity<buffer_diff::BufferDiff>,
1335 event: &BufferDiffEvent,
1336 cx: &mut Context<Self>,
1337 ) {
1338 if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
1339 let buffer_id = diff.read(cx).buffer_id;
1340 if let Some(diff_state) = self.diffs.get(&buffer_id) {
1341 let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
1342 diff_state.hunk_staging_operation_count += 1;
1343 diff_state.hunk_staging_operation_count
1344 });
1345 if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
1346 let recv = repo.update(cx, |repo, cx| {
1347 log::debug!("hunks changed for {}", path.display());
1348 repo.spawn_set_index_text_job(
1349 path,
1350 new_index_text.as_ref().map(|rope| rope.to_string()),
1351 Some(hunk_staging_operation_count),
1352 cx,
1353 )
1354 });
1355 let diff = diff.downgrade();
1356 cx.spawn(async move |this, cx| {
1357 if let Ok(Err(error)) = cx.background_spawn(recv).await {
1358 diff.update(cx, |diff, cx| {
1359 diff.clear_pending_hunks(cx);
1360 })
1361 .ok();
1362 this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
1363 .ok();
1364 }
1365 })
1366 .detach();
1367 }
1368 }
1369 }
1370 }
1371
1372 fn local_worktree_git_repos_changed(
1373 &mut self,
1374 worktree: Entity<Worktree>,
1375 changed_repos: &UpdatedGitRepositoriesSet,
1376 cx: &mut Context<Self>,
1377 ) {
1378 log::debug!("local worktree repos changed");
1379 debug_assert!(worktree.read(cx).is_local());
1380
1381 for repository in self.repositories.values() {
1382 repository.update(cx, |repository, cx| {
1383 let repo_abs_path = &repository.work_directory_abs_path;
1384 if changed_repos.iter().any(|update| {
1385 update.old_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1386 || update.new_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1387 }) {
1388 repository.reload_buffer_diff_bases(cx);
1389 }
1390 });
1391 }
1392 }
1393
1394 pub fn repositories(&self) -> &HashMap<RepositoryId, Entity<Repository>> {
1395 &self.repositories
1396 }
1397
1398 pub fn status_for_buffer_id(&self, buffer_id: BufferId, cx: &App) -> Option<FileStatus> {
1399 let (repo, path) = self.repository_and_path_for_buffer_id(buffer_id, cx)?;
1400 let status = repo.read(cx).snapshot.status_for_path(&path)?;
1401 Some(status.status)
1402 }
1403
1404 pub fn repository_and_path_for_buffer_id(
1405 &self,
1406 buffer_id: BufferId,
1407 cx: &App,
1408 ) -> Option<(Entity<Repository>, RepoPath)> {
1409 let buffer = self.buffer_store.read(cx).get(buffer_id)?;
1410 let project_path = buffer.read(cx).project_path(cx)?;
1411 self.repository_and_path_for_project_path(&project_path, cx)
1412 }
1413
1414 pub fn repository_and_path_for_project_path(
1415 &self,
1416 path: &ProjectPath,
1417 cx: &App,
1418 ) -> Option<(Entity<Repository>, RepoPath)> {
1419 let abs_path = self.worktree_store.read(cx).absolutize(path, cx)?;
1420 self.repositories
1421 .values()
1422 .filter_map(|repo| {
1423 let repo_path = repo.read(cx).abs_path_to_repo_path(&abs_path)?;
1424 Some((repo.clone(), repo_path))
1425 })
1426 .max_by_key(|(repo, _)| repo.read(cx).work_directory_abs_path.clone())
1427 }
1428
1429 pub fn git_init(
1430 &self,
1431 path: Arc<Path>,
1432 fallback_branch_name: String,
1433 cx: &App,
1434 ) -> Task<Result<()>> {
1435 match &self.state {
1436 GitStoreState::Local { fs, .. } => {
1437 let fs = fs.clone();
1438 cx.background_executor()
1439 .spawn(async move { fs.git_init(&path, fallback_branch_name) })
1440 }
1441 GitStoreState::Ssh {
1442 upstream_client,
1443 upstream_project_id: project_id,
1444 ..
1445 }
1446 | GitStoreState::Remote {
1447 upstream_client,
1448 upstream_project_id: project_id,
1449 ..
1450 } => {
1451 let client = upstream_client.clone();
1452 let project_id = *project_id;
1453 cx.background_executor().spawn(async move {
1454 client
1455 .request(proto::GitInit {
1456 project_id: project_id.0,
1457 abs_path: path.to_string_lossy().to_string(),
1458 fallback_branch_name,
1459 })
1460 .await?;
1461 Ok(())
1462 })
1463 }
1464 }
1465 }
1466
1467 async fn handle_update_repository(
1468 this: Entity<Self>,
1469 envelope: TypedEnvelope<proto::UpdateRepository>,
1470 mut cx: AsyncApp,
1471 ) -> Result<()> {
1472 this.update(&mut cx, |this, cx| {
1473 let mut update = envelope.payload;
1474
1475 let id = RepositoryId::from_proto(update.id);
1476 let client = this
1477 .upstream_client()
1478 .context("no upstream client")?
1479 .clone();
1480
1481 let mut is_new = false;
1482 let repo = this.repositories.entry(id).or_insert_with(|| {
1483 is_new = true;
1484 let git_store = cx.weak_entity();
1485 cx.new(|cx| {
1486 Repository::remote(
1487 id,
1488 Path::new(&update.abs_path).into(),
1489 ProjectId(update.project_id),
1490 client,
1491 git_store,
1492 cx,
1493 )
1494 })
1495 });
1496 if is_new {
1497 this._subscriptions
1498 .push(cx.subscribe(&repo, Self::on_repository_event))
1499 }
1500
1501 repo.update(cx, {
1502 let update = update.clone();
1503 |repo, cx| repo.apply_remote_update(update, cx)
1504 })?;
1505
1506 this.active_repo_id.get_or_insert_with(|| {
1507 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1508 id
1509 });
1510
1511 if let Some((client, project_id)) = this.downstream_client() {
1512 update.project_id = project_id.to_proto();
1513 client.send(update).log_err();
1514 }
1515 Ok(())
1516 })?
1517 }
1518
1519 async fn handle_remove_repository(
1520 this: Entity<Self>,
1521 envelope: TypedEnvelope<proto::RemoveRepository>,
1522 mut cx: AsyncApp,
1523 ) -> Result<()> {
1524 this.update(&mut cx, |this, cx| {
1525 let mut update = envelope.payload;
1526 let id = RepositoryId::from_proto(update.id);
1527 this.repositories.remove(&id);
1528 if let Some((client, project_id)) = this.downstream_client() {
1529 update.project_id = project_id.to_proto();
1530 client.send(update).log_err();
1531 }
1532 if this.active_repo_id == Some(id) {
1533 this.active_repo_id = None;
1534 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1535 }
1536 cx.emit(GitStoreEvent::RepositoryRemoved(id));
1537 })
1538 }
1539
1540 async fn handle_git_init(
1541 this: Entity<Self>,
1542 envelope: TypedEnvelope<proto::GitInit>,
1543 cx: AsyncApp,
1544 ) -> Result<proto::Ack> {
1545 let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1546 let name = envelope.payload.fallback_branch_name;
1547 cx.update(|cx| this.read(cx).git_init(path, name, cx))?
1548 .await?;
1549
1550 Ok(proto::Ack {})
1551 }
1552
1553 async fn handle_fetch(
1554 this: Entity<Self>,
1555 envelope: TypedEnvelope<proto::Fetch>,
1556 mut cx: AsyncApp,
1557 ) -> Result<proto::RemoteMessageResponse> {
1558 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1559 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1560 let askpass_id = envelope.payload.askpass_id;
1561
1562 let askpass = make_remote_delegate(
1563 this,
1564 envelope.payload.project_id,
1565 repository_id,
1566 askpass_id,
1567 &mut cx,
1568 );
1569
1570 let remote_output = repository_handle
1571 .update(&mut cx, |repository_handle, cx| {
1572 repository_handle.fetch(askpass, cx)
1573 })?
1574 .await??;
1575
1576 Ok(proto::RemoteMessageResponse {
1577 stdout: remote_output.stdout,
1578 stderr: remote_output.stderr,
1579 })
1580 }
1581
1582 async fn handle_push(
1583 this: Entity<Self>,
1584 envelope: TypedEnvelope<proto::Push>,
1585 mut cx: AsyncApp,
1586 ) -> Result<proto::RemoteMessageResponse> {
1587 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1588 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1589
1590 let askpass_id = envelope.payload.askpass_id;
1591 let askpass = make_remote_delegate(
1592 this,
1593 envelope.payload.project_id,
1594 repository_id,
1595 askpass_id,
1596 &mut cx,
1597 );
1598
1599 let options = envelope
1600 .payload
1601 .options
1602 .as_ref()
1603 .map(|_| match envelope.payload.options() {
1604 proto::push::PushOptions::SetUpstream => git::repository::PushOptions::SetUpstream,
1605 proto::push::PushOptions::Force => git::repository::PushOptions::Force,
1606 });
1607
1608 let branch_name = envelope.payload.branch_name.into();
1609 let remote_name = envelope.payload.remote_name.into();
1610
1611 let remote_output = repository_handle
1612 .update(&mut cx, |repository_handle, cx| {
1613 repository_handle.push(branch_name, remote_name, options, askpass, cx)
1614 })?
1615 .await??;
1616 Ok(proto::RemoteMessageResponse {
1617 stdout: remote_output.stdout,
1618 stderr: remote_output.stderr,
1619 })
1620 }
1621
1622 async fn handle_pull(
1623 this: Entity<Self>,
1624 envelope: TypedEnvelope<proto::Pull>,
1625 mut cx: AsyncApp,
1626 ) -> Result<proto::RemoteMessageResponse> {
1627 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1628 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1629 let askpass_id = envelope.payload.askpass_id;
1630 let askpass = make_remote_delegate(
1631 this,
1632 envelope.payload.project_id,
1633 repository_id,
1634 askpass_id,
1635 &mut cx,
1636 );
1637
1638 let branch_name = envelope.payload.branch_name.into();
1639 let remote_name = envelope.payload.remote_name.into();
1640
1641 let remote_message = repository_handle
1642 .update(&mut cx, |repository_handle, cx| {
1643 repository_handle.pull(branch_name, remote_name, askpass, cx)
1644 })?
1645 .await??;
1646
1647 Ok(proto::RemoteMessageResponse {
1648 stdout: remote_message.stdout,
1649 stderr: remote_message.stderr,
1650 })
1651 }
1652
1653 async fn handle_stage(
1654 this: Entity<Self>,
1655 envelope: TypedEnvelope<proto::Stage>,
1656 mut cx: AsyncApp,
1657 ) -> Result<proto::Ack> {
1658 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1659 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1660
1661 let entries = envelope
1662 .payload
1663 .paths
1664 .into_iter()
1665 .map(PathBuf::from)
1666 .map(RepoPath::new)
1667 .collect();
1668
1669 repository_handle
1670 .update(&mut cx, |repository_handle, cx| {
1671 repository_handle.stage_entries(entries, cx)
1672 })?
1673 .await?;
1674 Ok(proto::Ack {})
1675 }
1676
1677 async fn handle_unstage(
1678 this: Entity<Self>,
1679 envelope: TypedEnvelope<proto::Unstage>,
1680 mut cx: AsyncApp,
1681 ) -> Result<proto::Ack> {
1682 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1683 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1684
1685 let entries = envelope
1686 .payload
1687 .paths
1688 .into_iter()
1689 .map(PathBuf::from)
1690 .map(RepoPath::new)
1691 .collect();
1692
1693 repository_handle
1694 .update(&mut cx, |repository_handle, cx| {
1695 repository_handle.unstage_entries(entries, cx)
1696 })?
1697 .await?;
1698
1699 Ok(proto::Ack {})
1700 }
1701
1702 async fn handle_set_index_text(
1703 this: Entity<Self>,
1704 envelope: TypedEnvelope<proto::SetIndexText>,
1705 mut cx: AsyncApp,
1706 ) -> Result<proto::Ack> {
1707 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1708 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1709 let repo_path = RepoPath::from_str(&envelope.payload.path);
1710
1711 repository_handle
1712 .update(&mut cx, |repository_handle, cx| {
1713 repository_handle.spawn_set_index_text_job(
1714 repo_path,
1715 envelope.payload.text,
1716 None,
1717 cx,
1718 )
1719 })?
1720 .await??;
1721 Ok(proto::Ack {})
1722 }
1723
1724 async fn handle_commit(
1725 this: Entity<Self>,
1726 envelope: TypedEnvelope<proto::Commit>,
1727 mut cx: AsyncApp,
1728 ) -> Result<proto::Ack> {
1729 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1730 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1731
1732 let message = SharedString::from(envelope.payload.message);
1733 let name = envelope.payload.name.map(SharedString::from);
1734 let email = envelope.payload.email.map(SharedString::from);
1735 let options = envelope.payload.options.unwrap_or_default();
1736
1737 repository_handle
1738 .update(&mut cx, |repository_handle, cx| {
1739 repository_handle.commit(
1740 message,
1741 name.zip(email),
1742 CommitOptions {
1743 amend: options.amend,
1744 },
1745 cx,
1746 )
1747 })?
1748 .await??;
1749 Ok(proto::Ack {})
1750 }
1751
1752 async fn handle_get_remotes(
1753 this: Entity<Self>,
1754 envelope: TypedEnvelope<proto::GetRemotes>,
1755 mut cx: AsyncApp,
1756 ) -> Result<proto::GetRemotesResponse> {
1757 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1758 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1759
1760 let branch_name = envelope.payload.branch_name;
1761
1762 let remotes = repository_handle
1763 .update(&mut cx, |repository_handle, _| {
1764 repository_handle.get_remotes(branch_name)
1765 })?
1766 .await??;
1767
1768 Ok(proto::GetRemotesResponse {
1769 remotes: remotes
1770 .into_iter()
1771 .map(|remotes| proto::get_remotes_response::Remote {
1772 name: remotes.name.to_string(),
1773 })
1774 .collect::<Vec<_>>(),
1775 })
1776 }
1777
1778 async fn handle_get_branches(
1779 this: Entity<Self>,
1780 envelope: TypedEnvelope<proto::GitGetBranches>,
1781 mut cx: AsyncApp,
1782 ) -> Result<proto::GitBranchesResponse> {
1783 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1784 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1785
1786 let branches = repository_handle
1787 .update(&mut cx, |repository_handle, _| repository_handle.branches())?
1788 .await??;
1789
1790 Ok(proto::GitBranchesResponse {
1791 branches: branches
1792 .into_iter()
1793 .map(|branch| branch_to_proto(&branch))
1794 .collect::<Vec<_>>(),
1795 })
1796 }
1797 async fn handle_create_branch(
1798 this: Entity<Self>,
1799 envelope: TypedEnvelope<proto::GitCreateBranch>,
1800 mut cx: AsyncApp,
1801 ) -> Result<proto::Ack> {
1802 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1803 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1804 let branch_name = envelope.payload.branch_name;
1805
1806 repository_handle
1807 .update(&mut cx, |repository_handle, _| {
1808 repository_handle.create_branch(branch_name)
1809 })?
1810 .await??;
1811
1812 Ok(proto::Ack {})
1813 }
1814
1815 async fn handle_change_branch(
1816 this: Entity<Self>,
1817 envelope: TypedEnvelope<proto::GitChangeBranch>,
1818 mut cx: AsyncApp,
1819 ) -> Result<proto::Ack> {
1820 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1821 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1822 let branch_name = envelope.payload.branch_name;
1823
1824 repository_handle
1825 .update(&mut cx, |repository_handle, _| {
1826 repository_handle.change_branch(branch_name)
1827 })?
1828 .await??;
1829
1830 Ok(proto::Ack {})
1831 }
1832
1833 async fn handle_show(
1834 this: Entity<Self>,
1835 envelope: TypedEnvelope<proto::GitShow>,
1836 mut cx: AsyncApp,
1837 ) -> Result<proto::GitCommitDetails> {
1838 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1839 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1840
1841 let commit = repository_handle
1842 .update(&mut cx, |repository_handle, _| {
1843 repository_handle.show(envelope.payload.commit)
1844 })?
1845 .await??;
1846 Ok(proto::GitCommitDetails {
1847 sha: commit.sha.into(),
1848 message: commit.message.into(),
1849 commit_timestamp: commit.commit_timestamp,
1850 author_email: commit.author_email.into(),
1851 author_name: commit.author_name.into(),
1852 })
1853 }
1854
1855 async fn handle_load_commit_diff(
1856 this: Entity<Self>,
1857 envelope: TypedEnvelope<proto::LoadCommitDiff>,
1858 mut cx: AsyncApp,
1859 ) -> Result<proto::LoadCommitDiffResponse> {
1860 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1861 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1862
1863 let commit_diff = repository_handle
1864 .update(&mut cx, |repository_handle, _| {
1865 repository_handle.load_commit_diff(envelope.payload.commit)
1866 })?
1867 .await??;
1868 Ok(proto::LoadCommitDiffResponse {
1869 files: commit_diff
1870 .files
1871 .into_iter()
1872 .map(|file| proto::CommitFile {
1873 path: file.path.to_string(),
1874 old_text: file.old_text,
1875 new_text: file.new_text,
1876 })
1877 .collect(),
1878 })
1879 }
1880
1881 async fn handle_reset(
1882 this: Entity<Self>,
1883 envelope: TypedEnvelope<proto::GitReset>,
1884 mut cx: AsyncApp,
1885 ) -> Result<proto::Ack> {
1886 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1887 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1888
1889 let mode = match envelope.payload.mode() {
1890 git_reset::ResetMode::Soft => ResetMode::Soft,
1891 git_reset::ResetMode::Mixed => ResetMode::Mixed,
1892 };
1893
1894 repository_handle
1895 .update(&mut cx, |repository_handle, cx| {
1896 repository_handle.reset(envelope.payload.commit, mode, cx)
1897 })?
1898 .await??;
1899 Ok(proto::Ack {})
1900 }
1901
1902 async fn handle_checkout_files(
1903 this: Entity<Self>,
1904 envelope: TypedEnvelope<proto::GitCheckoutFiles>,
1905 mut cx: AsyncApp,
1906 ) -> Result<proto::Ack> {
1907 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1908 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1909 let paths = envelope
1910 .payload
1911 .paths
1912 .iter()
1913 .map(|s| RepoPath::from_str(s))
1914 .collect();
1915
1916 repository_handle
1917 .update(&mut cx, |repository_handle, cx| {
1918 repository_handle.checkout_files(&envelope.payload.commit, paths, cx)
1919 })?
1920 .await??;
1921 Ok(proto::Ack {})
1922 }
1923
1924 async fn handle_open_commit_message_buffer(
1925 this: Entity<Self>,
1926 envelope: TypedEnvelope<proto::OpenCommitMessageBuffer>,
1927 mut cx: AsyncApp,
1928 ) -> Result<proto::OpenBufferResponse> {
1929 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1930 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1931 let buffer = repository
1932 .update(&mut cx, |repository, cx| {
1933 repository.open_commit_buffer(None, this.read(cx).buffer_store.clone(), cx)
1934 })?
1935 .await?;
1936
1937 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id())?;
1938 this.update(&mut cx, |this, cx| {
1939 this.buffer_store.update(cx, |buffer_store, cx| {
1940 buffer_store
1941 .create_buffer_for_peer(
1942 &buffer,
1943 envelope.original_sender_id.unwrap_or(envelope.sender_id),
1944 cx,
1945 )
1946 .detach_and_log_err(cx);
1947 })
1948 })?;
1949
1950 Ok(proto::OpenBufferResponse {
1951 buffer_id: buffer_id.to_proto(),
1952 })
1953 }
1954
1955 async fn handle_askpass(
1956 this: Entity<Self>,
1957 envelope: TypedEnvelope<proto::AskPassRequest>,
1958 mut cx: AsyncApp,
1959 ) -> Result<proto::AskPassResponse> {
1960 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1961 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1962
1963 let delegates = cx.update(|cx| repository.read(cx).askpass_delegates.clone())?;
1964 let Some(mut askpass) = delegates.lock().remove(&envelope.payload.askpass_id) else {
1965 debug_panic!("no askpass found");
1966 anyhow::bail!("no askpass found");
1967 };
1968
1969 let response = askpass.ask_password(envelope.payload.prompt).await?;
1970
1971 delegates
1972 .lock()
1973 .insert(envelope.payload.askpass_id, askpass);
1974
1975 Ok(proto::AskPassResponse { response })
1976 }
1977
1978 async fn handle_check_for_pushed_commits(
1979 this: Entity<Self>,
1980 envelope: TypedEnvelope<proto::CheckForPushedCommits>,
1981 mut cx: AsyncApp,
1982 ) -> Result<proto::CheckForPushedCommitsResponse> {
1983 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1984 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1985
1986 let branches = repository_handle
1987 .update(&mut cx, |repository_handle, _| {
1988 repository_handle.check_for_pushed_commits()
1989 })?
1990 .await??;
1991 Ok(proto::CheckForPushedCommitsResponse {
1992 pushed_to: branches
1993 .into_iter()
1994 .map(|commit| commit.to_string())
1995 .collect(),
1996 })
1997 }
1998
1999 async fn handle_git_diff(
2000 this: Entity<Self>,
2001 envelope: TypedEnvelope<proto::GitDiff>,
2002 mut cx: AsyncApp,
2003 ) -> Result<proto::GitDiffResponse> {
2004 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2005 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2006 let diff_type = match envelope.payload.diff_type() {
2007 proto::git_diff::DiffType::HeadToIndex => DiffType::HeadToIndex,
2008 proto::git_diff::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
2009 };
2010
2011 let mut diff = repository_handle
2012 .update(&mut cx, |repository_handle, cx| {
2013 repository_handle.diff(diff_type, cx)
2014 })?
2015 .await??;
2016 const ONE_MB: usize = 1_000_000;
2017 if diff.len() > ONE_MB {
2018 diff = diff.chars().take(ONE_MB).collect()
2019 }
2020
2021 Ok(proto::GitDiffResponse { diff })
2022 }
2023
2024 async fn handle_open_unstaged_diff(
2025 this: Entity<Self>,
2026 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2027 mut cx: AsyncApp,
2028 ) -> Result<proto::OpenUnstagedDiffResponse> {
2029 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2030 let diff = this
2031 .update(&mut cx, |this, cx| {
2032 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2033 Some(this.open_unstaged_diff(buffer, cx))
2034 })?
2035 .context("missing buffer")?
2036 .await?;
2037 this.update(&mut cx, |this, _| {
2038 let shared_diffs = this
2039 .shared_diffs
2040 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2041 .or_default();
2042 shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone());
2043 })?;
2044 let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?;
2045 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2046 }
2047
2048 async fn handle_open_uncommitted_diff(
2049 this: Entity<Self>,
2050 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2051 mut cx: AsyncApp,
2052 ) -> Result<proto::OpenUncommittedDiffResponse> {
2053 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2054 let diff = this
2055 .update(&mut cx, |this, cx| {
2056 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2057 Some(this.open_uncommitted_diff(buffer, cx))
2058 })?
2059 .context("missing buffer")?
2060 .await?;
2061 this.update(&mut cx, |this, _| {
2062 let shared_diffs = this
2063 .shared_diffs
2064 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2065 .or_default();
2066 shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone());
2067 })?;
2068 diff.read_with(&cx, |diff, cx| {
2069 use proto::open_uncommitted_diff_response::Mode;
2070
2071 let unstaged_diff = diff.secondary_diff();
2072 let index_snapshot = unstaged_diff.and_then(|diff| {
2073 let diff = diff.read(cx);
2074 diff.base_text_exists().then(|| diff.base_text())
2075 });
2076
2077 let mode;
2078 let staged_text;
2079 let committed_text;
2080 if diff.base_text_exists() {
2081 let committed_snapshot = diff.base_text();
2082 committed_text = Some(committed_snapshot.text());
2083 if let Some(index_text) = index_snapshot {
2084 if index_text.remote_id() == committed_snapshot.remote_id() {
2085 mode = Mode::IndexMatchesHead;
2086 staged_text = None;
2087 } else {
2088 mode = Mode::IndexAndHead;
2089 staged_text = Some(index_text.text());
2090 }
2091 } else {
2092 mode = Mode::IndexAndHead;
2093 staged_text = None;
2094 }
2095 } else {
2096 mode = Mode::IndexAndHead;
2097 committed_text = None;
2098 staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2099 }
2100
2101 proto::OpenUncommittedDiffResponse {
2102 committed_text,
2103 staged_text,
2104 mode: mode.into(),
2105 }
2106 })
2107 }
2108
2109 async fn handle_update_diff_bases(
2110 this: Entity<Self>,
2111 request: TypedEnvelope<proto::UpdateDiffBases>,
2112 mut cx: AsyncApp,
2113 ) -> Result<()> {
2114 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2115 this.update(&mut cx, |this, cx| {
2116 if let Some(diff_state) = this.diffs.get_mut(&buffer_id) {
2117 if let Some(buffer) = this.buffer_store.read(cx).get(buffer_id) {
2118 let buffer = buffer.read(cx).text_snapshot();
2119 diff_state.update(cx, |diff_state, cx| {
2120 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2121 })
2122 }
2123 }
2124 })
2125 }
2126
2127 async fn handle_blame_buffer(
2128 this: Entity<Self>,
2129 envelope: TypedEnvelope<proto::BlameBuffer>,
2130 mut cx: AsyncApp,
2131 ) -> Result<proto::BlameBufferResponse> {
2132 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2133 let version = deserialize_version(&envelope.payload.version);
2134 let buffer = this.read_with(&cx, |this, cx| {
2135 this.buffer_store.read(cx).get_existing(buffer_id)
2136 })??;
2137 buffer
2138 .update(&mut cx, |buffer, _| {
2139 buffer.wait_for_version(version.clone())
2140 })?
2141 .await?;
2142 let blame = this
2143 .update(&mut cx, |this, cx| {
2144 this.blame_buffer(&buffer, Some(version), cx)
2145 })?
2146 .await?;
2147 Ok(serialize_blame_buffer_response(blame))
2148 }
2149
2150 async fn handle_get_permalink_to_line(
2151 this: Entity<Self>,
2152 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2153 mut cx: AsyncApp,
2154 ) -> Result<proto::GetPermalinkToLineResponse> {
2155 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2156 // let version = deserialize_version(&envelope.payload.version);
2157 let selection = {
2158 let proto_selection = envelope
2159 .payload
2160 .selection
2161 .context("no selection to get permalink for defined")?;
2162 proto_selection.start as u32..proto_selection.end as u32
2163 };
2164 let buffer = this.read_with(&cx, |this, cx| {
2165 this.buffer_store.read(cx).get_existing(buffer_id)
2166 })??;
2167 let permalink = this
2168 .update(&mut cx, |this, cx| {
2169 this.get_permalink_to_line(&buffer, selection, cx)
2170 })?
2171 .await?;
2172 Ok(proto::GetPermalinkToLineResponse {
2173 permalink: permalink.to_string(),
2174 })
2175 }
2176
2177 fn repository_for_request(
2178 this: &Entity<Self>,
2179 id: RepositoryId,
2180 cx: &mut AsyncApp,
2181 ) -> Result<Entity<Repository>> {
2182 this.read_with(cx, |this, _| {
2183 this.repositories
2184 .get(&id)
2185 .context("missing repository handle")
2186 .cloned()
2187 })?
2188 }
2189
2190 pub fn repo_snapshots(&self, cx: &App) -> HashMap<RepositoryId, RepositorySnapshot> {
2191 self.repositories
2192 .iter()
2193 .map(|(id, repo)| (*id, repo.read(cx).snapshot.clone()))
2194 .collect()
2195 }
2196}
2197
2198impl BufferGitState {
2199 fn new(_git_store: WeakEntity<GitStore>) -> Self {
2200 Self {
2201 unstaged_diff: Default::default(),
2202 uncommitted_diff: Default::default(),
2203 recalculate_diff_task: Default::default(),
2204 language: Default::default(),
2205 language_registry: Default::default(),
2206 recalculating_tx: postage::watch::channel_with(false).0,
2207 hunk_staging_operation_count: 0,
2208 hunk_staging_operation_count_as_of_write: 0,
2209 head_text: Default::default(),
2210 index_text: Default::default(),
2211 head_changed: Default::default(),
2212 index_changed: Default::default(),
2213 language_changed: Default::default(),
2214 conflict_updated_futures: Default::default(),
2215 conflict_set: Default::default(),
2216 reparse_conflict_markers_task: Default::default(),
2217 }
2218 }
2219
2220 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
2221 self.language = buffer.read(cx).language().cloned();
2222 self.language_changed = true;
2223 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
2224 }
2225
2226 fn reparse_conflict_markers(
2227 &mut self,
2228 buffer: text::BufferSnapshot,
2229 cx: &mut Context<Self>,
2230 ) -> oneshot::Receiver<()> {
2231 let (tx, rx) = oneshot::channel();
2232
2233 let Some(conflict_set) = self
2234 .conflict_set
2235 .as_ref()
2236 .and_then(|conflict_set| conflict_set.upgrade())
2237 else {
2238 return rx;
2239 };
2240
2241 let old_snapshot = conflict_set.read_with(cx, |conflict_set, _| {
2242 if conflict_set.has_conflict {
2243 Some(conflict_set.snapshot())
2244 } else {
2245 None
2246 }
2247 });
2248
2249 if let Some(old_snapshot) = old_snapshot {
2250 self.conflict_updated_futures.push(tx);
2251 self.reparse_conflict_markers_task = Some(cx.spawn(async move |this, cx| {
2252 let (snapshot, changed_range) = cx
2253 .background_spawn(async move {
2254 let new_snapshot = ConflictSet::parse(&buffer);
2255 let changed_range = old_snapshot.compare(&new_snapshot, &buffer);
2256 (new_snapshot, changed_range)
2257 })
2258 .await;
2259 this.update(cx, |this, cx| {
2260 if let Some(conflict_set) = &this.conflict_set {
2261 conflict_set
2262 .update(cx, |conflict_set, cx| {
2263 conflict_set.set_snapshot(snapshot, changed_range, cx);
2264 })
2265 .ok();
2266 }
2267 let futures = std::mem::take(&mut this.conflict_updated_futures);
2268 for tx in futures {
2269 tx.send(()).ok();
2270 }
2271 })
2272 }))
2273 }
2274
2275 rx
2276 }
2277
2278 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
2279 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
2280 }
2281
2282 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
2283 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
2284 }
2285
2286 fn handle_base_texts_updated(
2287 &mut self,
2288 buffer: text::BufferSnapshot,
2289 message: proto::UpdateDiffBases,
2290 cx: &mut Context<Self>,
2291 ) {
2292 use proto::update_diff_bases::Mode;
2293
2294 let Some(mode) = Mode::from_i32(message.mode) else {
2295 return;
2296 };
2297
2298 let diff_bases_change = match mode {
2299 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
2300 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
2301 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
2302 Mode::IndexAndHead => DiffBasesChange::SetEach {
2303 index: message.staged_text,
2304 head: message.committed_text,
2305 },
2306 };
2307
2308 self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
2309 }
2310
2311 pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
2312 if *self.recalculating_tx.borrow() {
2313 let mut rx = self.recalculating_tx.subscribe();
2314 return Some(async move {
2315 loop {
2316 let is_recalculating = rx.recv().await;
2317 if is_recalculating != Some(true) {
2318 break;
2319 }
2320 }
2321 });
2322 } else {
2323 None
2324 }
2325 }
2326
2327 fn diff_bases_changed(
2328 &mut self,
2329 buffer: text::BufferSnapshot,
2330 diff_bases_change: Option<DiffBasesChange>,
2331 cx: &mut Context<Self>,
2332 ) {
2333 match diff_bases_change {
2334 Some(DiffBasesChange::SetIndex(index)) => {
2335 self.index_text = index.map(|mut index| {
2336 text::LineEnding::normalize(&mut index);
2337 Arc::new(index)
2338 });
2339 self.index_changed = true;
2340 }
2341 Some(DiffBasesChange::SetHead(head)) => {
2342 self.head_text = head.map(|mut head| {
2343 text::LineEnding::normalize(&mut head);
2344 Arc::new(head)
2345 });
2346 self.head_changed = true;
2347 }
2348 Some(DiffBasesChange::SetBoth(text)) => {
2349 let text = text.map(|mut text| {
2350 text::LineEnding::normalize(&mut text);
2351 Arc::new(text)
2352 });
2353 self.head_text = text.clone();
2354 self.index_text = text;
2355 self.head_changed = true;
2356 self.index_changed = true;
2357 }
2358 Some(DiffBasesChange::SetEach { index, head }) => {
2359 self.index_text = index.map(|mut index| {
2360 text::LineEnding::normalize(&mut index);
2361 Arc::new(index)
2362 });
2363 self.index_changed = true;
2364 self.head_text = head.map(|mut head| {
2365 text::LineEnding::normalize(&mut head);
2366 Arc::new(head)
2367 });
2368 self.head_changed = true;
2369 }
2370 None => {}
2371 }
2372
2373 self.recalculate_diffs(buffer, cx)
2374 }
2375
2376 fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
2377 *self.recalculating_tx.borrow_mut() = true;
2378
2379 let language = self.language.clone();
2380 let language_registry = self.language_registry.clone();
2381 let unstaged_diff = self.unstaged_diff();
2382 let uncommitted_diff = self.uncommitted_diff();
2383 let head = self.head_text.clone();
2384 let index = self.index_text.clone();
2385 let index_changed = self.index_changed;
2386 let head_changed = self.head_changed;
2387 let language_changed = self.language_changed;
2388 let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_write;
2389 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
2390 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
2391 (None, None) => true,
2392 _ => false,
2393 };
2394 self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
2395 log::debug!(
2396 "start recalculating diffs for buffer {}",
2397 buffer.remote_id()
2398 );
2399
2400 let mut new_unstaged_diff = None;
2401 if let Some(unstaged_diff) = &unstaged_diff {
2402 new_unstaged_diff = Some(
2403 BufferDiff::update_diff(
2404 unstaged_diff.clone(),
2405 buffer.clone(),
2406 index,
2407 index_changed,
2408 language_changed,
2409 language.clone(),
2410 language_registry.clone(),
2411 cx,
2412 )
2413 .await?,
2414 );
2415 }
2416
2417 let mut new_uncommitted_diff = None;
2418 if let Some(uncommitted_diff) = &uncommitted_diff {
2419 new_uncommitted_diff = if index_matches_head {
2420 new_unstaged_diff.clone()
2421 } else {
2422 Some(
2423 BufferDiff::update_diff(
2424 uncommitted_diff.clone(),
2425 buffer.clone(),
2426 head,
2427 head_changed,
2428 language_changed,
2429 language.clone(),
2430 language_registry.clone(),
2431 cx,
2432 )
2433 .await?,
2434 )
2435 }
2436 }
2437
2438 let cancel = this.update(cx, |this, _| {
2439 // This checks whether all pending stage/unstage operations
2440 // have quiesced (i.e. both the corresponding write and the
2441 // read of that write have completed). If not, then we cancel
2442 // this recalculation attempt to avoid invalidating pending
2443 // state too quickly; another recalculation will come along
2444 // later and clear the pending state once the state of the index has settled.
2445 if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
2446 *this.recalculating_tx.borrow_mut() = false;
2447 true
2448 } else {
2449 false
2450 }
2451 })?;
2452 if cancel {
2453 log::debug!(
2454 concat!(
2455 "aborting recalculating diffs for buffer {}",
2456 "due to subsequent hunk operations",
2457 ),
2458 buffer.remote_id()
2459 );
2460 return Ok(());
2461 }
2462
2463 let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
2464 unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
2465 {
2466 unstaged_diff.update(cx, |diff, cx| {
2467 if language_changed {
2468 diff.language_changed(cx);
2469 }
2470 diff.set_snapshot(new_unstaged_diff, &buffer, cx)
2471 })?
2472 } else {
2473 None
2474 };
2475
2476 if let Some((uncommitted_diff, new_uncommitted_diff)) =
2477 uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
2478 {
2479 uncommitted_diff.update(cx, |diff, cx| {
2480 if language_changed {
2481 diff.language_changed(cx);
2482 }
2483 diff.set_snapshot_with_secondary(
2484 new_uncommitted_diff,
2485 &buffer,
2486 unstaged_changed_range,
2487 true,
2488 cx,
2489 );
2490 })?;
2491 }
2492
2493 log::debug!(
2494 "finished recalculating diffs for buffer {}",
2495 buffer.remote_id()
2496 );
2497
2498 if let Some(this) = this.upgrade() {
2499 this.update(cx, |this, _| {
2500 this.index_changed = false;
2501 this.head_changed = false;
2502 this.language_changed = false;
2503 *this.recalculating_tx.borrow_mut() = false;
2504 })?;
2505 }
2506
2507 Ok(())
2508 }));
2509 }
2510}
2511
2512fn make_remote_delegate(
2513 this: Entity<GitStore>,
2514 project_id: u64,
2515 repository_id: RepositoryId,
2516 askpass_id: u64,
2517 cx: &mut AsyncApp,
2518) -> AskPassDelegate {
2519 AskPassDelegate::new(cx, move |prompt, tx, cx| {
2520 this.update(cx, |this, cx| {
2521 let Some((client, _)) = this.downstream_client() else {
2522 return;
2523 };
2524 let response = client.request(proto::AskPassRequest {
2525 project_id,
2526 repository_id: repository_id.to_proto(),
2527 askpass_id,
2528 prompt,
2529 });
2530 cx.spawn(async move |_, _| {
2531 tx.send(response.await?.response).ok();
2532 anyhow::Ok(())
2533 })
2534 .detach_and_log_err(cx);
2535 })
2536 .log_err();
2537 })
2538}
2539
2540impl RepositoryId {
2541 pub fn to_proto(self) -> u64 {
2542 self.0
2543 }
2544
2545 pub fn from_proto(id: u64) -> Self {
2546 RepositoryId(id)
2547 }
2548}
2549
2550impl RepositorySnapshot {
2551 fn empty(id: RepositoryId, work_directory_abs_path: Arc<Path>) -> Self {
2552 Self {
2553 id,
2554 statuses_by_path: Default::default(),
2555 work_directory_abs_path,
2556 branch: None,
2557 head_commit: None,
2558 scan_id: 0,
2559 merge: Default::default(),
2560 }
2561 }
2562
2563 fn initial_update(&self, project_id: u64) -> proto::UpdateRepository {
2564 proto::UpdateRepository {
2565 branch_summary: self.branch.as_ref().map(branch_to_proto),
2566 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2567 updated_statuses: self
2568 .statuses_by_path
2569 .iter()
2570 .map(|entry| entry.to_proto())
2571 .collect(),
2572 removed_statuses: Default::default(),
2573 current_merge_conflicts: self
2574 .merge
2575 .conflicted_paths
2576 .iter()
2577 .map(|repo_path| repo_path.to_proto())
2578 .collect(),
2579 project_id,
2580 id: self.id.to_proto(),
2581 abs_path: self.work_directory_abs_path.to_proto(),
2582 entry_ids: vec![self.id.to_proto()],
2583 scan_id: self.scan_id,
2584 is_last_update: true,
2585 }
2586 }
2587
2588 fn build_update(&self, old: &Self, project_id: u64) -> proto::UpdateRepository {
2589 let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
2590 let mut removed_statuses: Vec<String> = Vec::new();
2591
2592 let mut new_statuses = self.statuses_by_path.iter().peekable();
2593 let mut old_statuses = old.statuses_by_path.iter().peekable();
2594
2595 let mut current_new_entry = new_statuses.next();
2596 let mut current_old_entry = old_statuses.next();
2597 loop {
2598 match (current_new_entry, current_old_entry) {
2599 (Some(new_entry), Some(old_entry)) => {
2600 match new_entry.repo_path.cmp(&old_entry.repo_path) {
2601 Ordering::Less => {
2602 updated_statuses.push(new_entry.to_proto());
2603 current_new_entry = new_statuses.next();
2604 }
2605 Ordering::Equal => {
2606 if new_entry.status != old_entry.status {
2607 updated_statuses.push(new_entry.to_proto());
2608 }
2609 current_old_entry = old_statuses.next();
2610 current_new_entry = new_statuses.next();
2611 }
2612 Ordering::Greater => {
2613 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2614 current_old_entry = old_statuses.next();
2615 }
2616 }
2617 }
2618 (None, Some(old_entry)) => {
2619 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2620 current_old_entry = old_statuses.next();
2621 }
2622 (Some(new_entry), None) => {
2623 updated_statuses.push(new_entry.to_proto());
2624 current_new_entry = new_statuses.next();
2625 }
2626 (None, None) => break,
2627 }
2628 }
2629
2630 proto::UpdateRepository {
2631 branch_summary: self.branch.as_ref().map(branch_to_proto),
2632 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2633 updated_statuses,
2634 removed_statuses,
2635 current_merge_conflicts: self
2636 .merge
2637 .conflicted_paths
2638 .iter()
2639 .map(|path| path.as_ref().to_proto())
2640 .collect(),
2641 project_id,
2642 id: self.id.to_proto(),
2643 abs_path: self.work_directory_abs_path.to_proto(),
2644 entry_ids: vec![],
2645 scan_id: self.scan_id,
2646 is_last_update: true,
2647 }
2648 }
2649
2650 pub fn status(&self) -> impl Iterator<Item = StatusEntry> + '_ {
2651 self.statuses_by_path.iter().cloned()
2652 }
2653
2654 pub fn status_summary(&self) -> GitSummary {
2655 self.statuses_by_path.summary().item_summary
2656 }
2657
2658 pub fn status_for_path(&self, path: &RepoPath) -> Option<StatusEntry> {
2659 self.statuses_by_path
2660 .get(&PathKey(path.0.clone()), &())
2661 .cloned()
2662 }
2663
2664 pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
2665 abs_path
2666 .strip_prefix(&self.work_directory_abs_path)
2667 .map(RepoPath::from)
2668 .ok()
2669 }
2670
2671 pub fn has_conflict(&self, repo_path: &RepoPath) -> bool {
2672 self.merge.conflicted_paths.contains(repo_path)
2673 }
2674
2675 /// This is the name that will be displayed in the repository selector for this repository.
2676 pub fn display_name(&self) -> SharedString {
2677 self.work_directory_abs_path
2678 .file_name()
2679 .unwrap_or_default()
2680 .to_string_lossy()
2681 .to_string()
2682 .into()
2683 }
2684}
2685
2686impl MergeDetails {
2687 async fn load(
2688 backend: &Arc<dyn GitRepository>,
2689 status: &SumTree<StatusEntry>,
2690 prev_snapshot: &RepositorySnapshot,
2691 ) -> Result<(MergeDetails, bool)> {
2692 log::debug!("load merge details");
2693 let message = backend.merge_message().await;
2694 let heads = backend
2695 .revparse_batch(vec![
2696 "MERGE_HEAD".into(),
2697 "CHERRY_PICK_HEAD".into(),
2698 "REBASE_HEAD".into(),
2699 "REVERT_HEAD".into(),
2700 "APPLY_HEAD".into(),
2701 ])
2702 .await
2703 .log_err()
2704 .unwrap_or_default()
2705 .into_iter()
2706 .map(|opt| opt.map(SharedString::from))
2707 .collect::<Vec<_>>();
2708 let merge_heads_changed = heads != prev_snapshot.merge.heads;
2709 let conflicted_paths = if merge_heads_changed {
2710 let current_conflicted_paths = TreeSet::from_ordered_entries(
2711 status
2712 .iter()
2713 .filter(|entry| entry.status.is_conflicted())
2714 .map(|entry| entry.repo_path.clone()),
2715 );
2716
2717 // It can happen that we run a scan while a lengthy merge is in progress
2718 // that will eventually result in conflicts, but before those conflicts
2719 // are reported by `git status`. Since for the moment we only care about
2720 // the merge heads state for the purposes of tracking conflicts, don't update
2721 // this state until we see some conflicts.
2722 if heads.iter().any(Option::is_some)
2723 && !prev_snapshot.merge.heads.iter().any(Option::is_some)
2724 && current_conflicted_paths.is_empty()
2725 {
2726 log::debug!("not updating merge heads because no conflicts found");
2727 return Ok((
2728 MergeDetails {
2729 message: message.map(SharedString::from),
2730 ..prev_snapshot.merge.clone()
2731 },
2732 false,
2733 ));
2734 }
2735
2736 current_conflicted_paths
2737 } else {
2738 prev_snapshot.merge.conflicted_paths.clone()
2739 };
2740 let details = MergeDetails {
2741 conflicted_paths,
2742 message: message.map(SharedString::from),
2743 heads,
2744 };
2745 Ok((details, merge_heads_changed))
2746 }
2747}
2748
2749impl Repository {
2750 pub fn snapshot(&self) -> RepositorySnapshot {
2751 self.snapshot.clone()
2752 }
2753
2754 fn local(
2755 id: RepositoryId,
2756 work_directory_abs_path: Arc<Path>,
2757 dot_git_abs_path: Arc<Path>,
2758 repository_dir_abs_path: Arc<Path>,
2759 common_dir_abs_path: Arc<Path>,
2760 project_environment: WeakEntity<ProjectEnvironment>,
2761 fs: Arc<dyn Fs>,
2762 git_store: WeakEntity<GitStore>,
2763 cx: &mut Context<Self>,
2764 ) -> Self {
2765 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path.clone());
2766 Repository {
2767 this: cx.weak_entity(),
2768 git_store,
2769 snapshot,
2770 commit_message_buffer: None,
2771 askpass_delegates: Default::default(),
2772 paths_needing_status_update: Default::default(),
2773 latest_askpass_id: 0,
2774 job_sender: Repository::spawn_local_git_worker(
2775 work_directory_abs_path,
2776 dot_git_abs_path,
2777 repository_dir_abs_path,
2778 common_dir_abs_path,
2779 project_environment,
2780 fs,
2781 cx,
2782 ),
2783 job_id: 0,
2784 active_jobs: Default::default(),
2785 }
2786 }
2787
2788 fn remote(
2789 id: RepositoryId,
2790 work_directory_abs_path: Arc<Path>,
2791 project_id: ProjectId,
2792 client: AnyProtoClient,
2793 git_store: WeakEntity<GitStore>,
2794 cx: &mut Context<Self>,
2795 ) -> Self {
2796 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path);
2797 Self {
2798 this: cx.weak_entity(),
2799 snapshot,
2800 commit_message_buffer: None,
2801 git_store,
2802 paths_needing_status_update: Default::default(),
2803 job_sender: Self::spawn_remote_git_worker(project_id, client, cx),
2804 askpass_delegates: Default::default(),
2805 latest_askpass_id: 0,
2806 active_jobs: Default::default(),
2807 job_id: 0,
2808 }
2809 }
2810
2811 pub fn git_store(&self) -> Option<Entity<GitStore>> {
2812 self.git_store.upgrade()
2813 }
2814
2815 fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
2816 let this = cx.weak_entity();
2817 let git_store = self.git_store.clone();
2818 let _ = self.send_keyed_job(
2819 Some(GitJobKey::ReloadBufferDiffBases),
2820 None,
2821 |state, mut cx| async move {
2822 let RepositoryState::Local { backend, .. } = state else {
2823 log::error!("tried to recompute diffs for a non-local repository");
2824 return Ok(());
2825 };
2826
2827 let Some(this) = this.upgrade() else {
2828 return Ok(());
2829 };
2830
2831 let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
2832 git_store.update(cx, |git_store, cx| {
2833 git_store
2834 .diffs
2835 .iter()
2836 .filter_map(|(buffer_id, diff_state)| {
2837 let buffer_store = git_store.buffer_store.read(cx);
2838 let buffer = buffer_store.get(*buffer_id)?;
2839 let file = File::from_dyn(buffer.read(cx).file())?;
2840 let abs_path =
2841 file.worktree.read(cx).absolutize(&file.path).ok()?;
2842 let repo_path = this.abs_path_to_repo_path(&abs_path)?;
2843 log::debug!(
2844 "start reload diff bases for repo path {}",
2845 repo_path.0.display()
2846 );
2847 diff_state.update(cx, |diff_state, _| {
2848 let has_unstaged_diff = diff_state
2849 .unstaged_diff
2850 .as_ref()
2851 .is_some_and(|diff| diff.is_upgradable());
2852 let has_uncommitted_diff = diff_state
2853 .uncommitted_diff
2854 .as_ref()
2855 .is_some_and(|set| set.is_upgradable());
2856
2857 Some((
2858 buffer,
2859 repo_path,
2860 has_unstaged_diff.then(|| diff_state.index_text.clone()),
2861 has_uncommitted_diff.then(|| diff_state.head_text.clone()),
2862 ))
2863 })
2864 })
2865 .collect::<Vec<_>>()
2866 })
2867 })??;
2868
2869 let buffer_diff_base_changes = cx
2870 .background_spawn(async move {
2871 let mut changes = Vec::new();
2872 for (buffer, repo_path, current_index_text, current_head_text) in
2873 &repo_diff_state_updates
2874 {
2875 let index_text = if current_index_text.is_some() {
2876 backend.load_index_text(repo_path.clone()).await
2877 } else {
2878 None
2879 };
2880 let head_text = if current_head_text.is_some() {
2881 backend.load_committed_text(repo_path.clone()).await
2882 } else {
2883 None
2884 };
2885
2886 let change =
2887 match (current_index_text.as_ref(), current_head_text.as_ref()) {
2888 (Some(current_index), Some(current_head)) => {
2889 let index_changed =
2890 index_text.as_ref() != current_index.as_deref();
2891 let head_changed =
2892 head_text.as_ref() != current_head.as_deref();
2893 if index_changed && head_changed {
2894 if index_text == head_text {
2895 Some(DiffBasesChange::SetBoth(head_text))
2896 } else {
2897 Some(DiffBasesChange::SetEach {
2898 index: index_text,
2899 head: head_text,
2900 })
2901 }
2902 } else if index_changed {
2903 Some(DiffBasesChange::SetIndex(index_text))
2904 } else if head_changed {
2905 Some(DiffBasesChange::SetHead(head_text))
2906 } else {
2907 None
2908 }
2909 }
2910 (Some(current_index), None) => {
2911 let index_changed =
2912 index_text.as_ref() != current_index.as_deref();
2913 index_changed
2914 .then_some(DiffBasesChange::SetIndex(index_text))
2915 }
2916 (None, Some(current_head)) => {
2917 let head_changed =
2918 head_text.as_ref() != current_head.as_deref();
2919 head_changed.then_some(DiffBasesChange::SetHead(head_text))
2920 }
2921 (None, None) => None,
2922 };
2923
2924 changes.push((buffer.clone(), change))
2925 }
2926 changes
2927 })
2928 .await;
2929
2930 git_store.update(&mut cx, |git_store, cx| {
2931 for (buffer, diff_bases_change) in buffer_diff_base_changes {
2932 let buffer_snapshot = buffer.read(cx).text_snapshot();
2933 let buffer_id = buffer_snapshot.remote_id();
2934 let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
2935 continue;
2936 };
2937
2938 let downstream_client = git_store.downstream_client();
2939 diff_state.update(cx, |diff_state, cx| {
2940 use proto::update_diff_bases::Mode;
2941
2942 if let Some((diff_bases_change, (client, project_id))) =
2943 diff_bases_change.clone().zip(downstream_client)
2944 {
2945 let (staged_text, committed_text, mode) = match diff_bases_change {
2946 DiffBasesChange::SetIndex(index) => {
2947 (index, None, Mode::IndexOnly)
2948 }
2949 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
2950 DiffBasesChange::SetEach { index, head } => {
2951 (index, head, Mode::IndexAndHead)
2952 }
2953 DiffBasesChange::SetBoth(text) => {
2954 (None, text, Mode::IndexMatchesHead)
2955 }
2956 };
2957 client
2958 .send(proto::UpdateDiffBases {
2959 project_id: project_id.to_proto(),
2960 buffer_id: buffer_id.to_proto(),
2961 staged_text,
2962 committed_text,
2963 mode: mode as i32,
2964 })
2965 .log_err();
2966 }
2967
2968 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
2969 });
2970 }
2971 })
2972 },
2973 );
2974 }
2975
2976 pub fn send_job<F, Fut, R>(
2977 &mut self,
2978 status: Option<SharedString>,
2979 job: F,
2980 ) -> oneshot::Receiver<R>
2981 where
2982 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
2983 Fut: Future<Output = R> + 'static,
2984 R: Send + 'static,
2985 {
2986 self.send_keyed_job(None, status, job)
2987 }
2988
2989 fn send_keyed_job<F, Fut, R>(
2990 &mut self,
2991 key: Option<GitJobKey>,
2992 status: Option<SharedString>,
2993 job: F,
2994 ) -> oneshot::Receiver<R>
2995 where
2996 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
2997 Fut: Future<Output = R> + 'static,
2998 R: Send + 'static,
2999 {
3000 let (result_tx, result_rx) = futures::channel::oneshot::channel();
3001 let job_id = post_inc(&mut self.job_id);
3002 let this = self.this.clone();
3003 self.job_sender
3004 .unbounded_send(GitJob {
3005 key,
3006 job: Box::new(move |state, cx: &mut AsyncApp| {
3007 let job = job(state, cx.clone());
3008 cx.spawn(async move |cx| {
3009 if let Some(s) = status.clone() {
3010 this.update(cx, |this, cx| {
3011 this.active_jobs.insert(
3012 job_id,
3013 JobInfo {
3014 start: Instant::now(),
3015 message: s.clone(),
3016 },
3017 );
3018
3019 cx.notify();
3020 })
3021 .ok();
3022 }
3023 let result = job.await;
3024
3025 this.update(cx, |this, cx| {
3026 this.active_jobs.remove(&job_id);
3027 cx.notify();
3028 })
3029 .ok();
3030
3031 result_tx.send(result).ok();
3032 })
3033 }),
3034 })
3035 .ok();
3036 result_rx
3037 }
3038
3039 pub fn set_as_active_repository(&self, cx: &mut Context<Self>) {
3040 let Some(git_store) = self.git_store.upgrade() else {
3041 return;
3042 };
3043 let entity = cx.entity();
3044 git_store.update(cx, |git_store, cx| {
3045 let Some((&id, _)) = git_store
3046 .repositories
3047 .iter()
3048 .find(|(_, handle)| *handle == &entity)
3049 else {
3050 return;
3051 };
3052 git_store.active_repo_id = Some(id);
3053 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
3054 });
3055 }
3056
3057 pub fn cached_status(&self) -> impl '_ + Iterator<Item = StatusEntry> {
3058 self.snapshot.status()
3059 }
3060
3061 pub fn repo_path_to_project_path(&self, path: &RepoPath, cx: &App) -> Option<ProjectPath> {
3062 let git_store = self.git_store.upgrade()?;
3063 let worktree_store = git_store.read(cx).worktree_store.read(cx);
3064 let abs_path = self.snapshot.work_directory_abs_path.join(&path.0);
3065 let (worktree, relative_path) = worktree_store.find_worktree(abs_path, cx)?;
3066 Some(ProjectPath {
3067 worktree_id: worktree.read(cx).id(),
3068 path: relative_path.into(),
3069 })
3070 }
3071
3072 pub fn project_path_to_repo_path(&self, path: &ProjectPath, cx: &App) -> Option<RepoPath> {
3073 let git_store = self.git_store.upgrade()?;
3074 let worktree_store = git_store.read(cx).worktree_store.read(cx);
3075 let abs_path = worktree_store.absolutize(path, cx)?;
3076 self.snapshot.abs_path_to_repo_path(&abs_path)
3077 }
3078
3079 pub fn contains_sub_repo(&self, other: &Entity<Self>, cx: &App) -> bool {
3080 other
3081 .read(cx)
3082 .snapshot
3083 .work_directory_abs_path
3084 .starts_with(&self.snapshot.work_directory_abs_path)
3085 }
3086
3087 pub fn open_commit_buffer(
3088 &mut self,
3089 languages: Option<Arc<LanguageRegistry>>,
3090 buffer_store: Entity<BufferStore>,
3091 cx: &mut Context<Self>,
3092 ) -> Task<Result<Entity<Buffer>>> {
3093 let id = self.id;
3094 if let Some(buffer) = self.commit_message_buffer.clone() {
3095 return Task::ready(Ok(buffer));
3096 }
3097 let this = cx.weak_entity();
3098
3099 let rx = self.send_job(None, move |state, mut cx| async move {
3100 let Some(this) = this.upgrade() else {
3101 bail!("git store was dropped");
3102 };
3103 match state {
3104 RepositoryState::Local { .. } => {
3105 this.update(&mut cx, |_, cx| {
3106 Self::open_local_commit_buffer(languages, buffer_store, cx)
3107 })?
3108 .await
3109 }
3110 RepositoryState::Remote { project_id, client } => {
3111 let request = client.request(proto::OpenCommitMessageBuffer {
3112 project_id: project_id.0,
3113 repository_id: id.to_proto(),
3114 });
3115 let response = request.await.context("requesting to open commit buffer")?;
3116 let buffer_id = BufferId::new(response.buffer_id)?;
3117 let buffer = buffer_store
3118 .update(&mut cx, |buffer_store, cx| {
3119 buffer_store.wait_for_remote_buffer(buffer_id, cx)
3120 })?
3121 .await?;
3122 if let Some(language_registry) = languages {
3123 let git_commit_language =
3124 language_registry.language_for_name("Git Commit").await?;
3125 buffer.update(&mut cx, |buffer, cx| {
3126 buffer.set_language(Some(git_commit_language), cx);
3127 })?;
3128 }
3129 this.update(&mut cx, |this, _| {
3130 this.commit_message_buffer = Some(buffer.clone());
3131 })?;
3132 Ok(buffer)
3133 }
3134 }
3135 });
3136
3137 cx.spawn(|_, _: &mut AsyncApp| async move { rx.await? })
3138 }
3139
3140 fn open_local_commit_buffer(
3141 language_registry: Option<Arc<LanguageRegistry>>,
3142 buffer_store: Entity<BufferStore>,
3143 cx: &mut Context<Self>,
3144 ) -> Task<Result<Entity<Buffer>>> {
3145 cx.spawn(async move |repository, cx| {
3146 let buffer = buffer_store
3147 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx))?
3148 .await?;
3149
3150 if let Some(language_registry) = language_registry {
3151 let git_commit_language = language_registry.language_for_name("Git Commit").await?;
3152 buffer.update(cx, |buffer, cx| {
3153 buffer.set_language(Some(git_commit_language), cx);
3154 })?;
3155 }
3156
3157 repository.update(cx, |repository, _| {
3158 repository.commit_message_buffer = Some(buffer.clone());
3159 })?;
3160 Ok(buffer)
3161 })
3162 }
3163
3164 pub fn checkout_files(
3165 &mut self,
3166 commit: &str,
3167 paths: Vec<RepoPath>,
3168 _cx: &mut App,
3169 ) -> oneshot::Receiver<Result<()>> {
3170 let commit = commit.to_string();
3171 let id = self.id;
3172
3173 self.send_job(
3174 Some(format!("git checkout {}", commit).into()),
3175 move |git_repo, _| async move {
3176 match git_repo {
3177 RepositoryState::Local {
3178 backend,
3179 environment,
3180 ..
3181 } => {
3182 backend
3183 .checkout_files(commit, paths, environment.clone())
3184 .await
3185 }
3186 RepositoryState::Remote { project_id, client } => {
3187 client
3188 .request(proto::GitCheckoutFiles {
3189 project_id: project_id.0,
3190 repository_id: id.to_proto(),
3191 commit,
3192 paths: paths
3193 .into_iter()
3194 .map(|p| p.to_string_lossy().to_string())
3195 .collect(),
3196 })
3197 .await?;
3198
3199 Ok(())
3200 }
3201 }
3202 },
3203 )
3204 }
3205
3206 pub fn reset(
3207 &mut self,
3208 commit: String,
3209 reset_mode: ResetMode,
3210 _cx: &mut App,
3211 ) -> oneshot::Receiver<Result<()>> {
3212 let commit = commit.to_string();
3213 let id = self.id;
3214
3215 self.send_job(None, move |git_repo, _| async move {
3216 match git_repo {
3217 RepositoryState::Local {
3218 backend,
3219 environment,
3220 ..
3221 } => backend.reset(commit, reset_mode, environment).await,
3222 RepositoryState::Remote { project_id, client } => {
3223 client
3224 .request(proto::GitReset {
3225 project_id: project_id.0,
3226 repository_id: id.to_proto(),
3227 commit,
3228 mode: match reset_mode {
3229 ResetMode::Soft => git_reset::ResetMode::Soft.into(),
3230 ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
3231 },
3232 })
3233 .await?;
3234
3235 Ok(())
3236 }
3237 }
3238 })
3239 }
3240
3241 pub fn show(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDetails>> {
3242 let id = self.id;
3243 self.send_job(None, move |git_repo, _cx| async move {
3244 match git_repo {
3245 RepositoryState::Local { backend, .. } => backend.show(commit).await,
3246 RepositoryState::Remote { project_id, client } => {
3247 let resp = client
3248 .request(proto::GitShow {
3249 project_id: project_id.0,
3250 repository_id: id.to_proto(),
3251 commit,
3252 })
3253 .await?;
3254
3255 Ok(CommitDetails {
3256 sha: resp.sha.into(),
3257 message: resp.message.into(),
3258 commit_timestamp: resp.commit_timestamp,
3259 author_email: resp.author_email.into(),
3260 author_name: resp.author_name.into(),
3261 })
3262 }
3263 }
3264 })
3265 }
3266
3267 pub fn load_commit_diff(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
3268 let id = self.id;
3269 self.send_job(None, move |git_repo, cx| async move {
3270 match git_repo {
3271 RepositoryState::Local { backend, .. } => backend.load_commit(commit, cx).await,
3272 RepositoryState::Remote {
3273 client, project_id, ..
3274 } => {
3275 let response = client
3276 .request(proto::LoadCommitDiff {
3277 project_id: project_id.0,
3278 repository_id: id.to_proto(),
3279 commit,
3280 })
3281 .await?;
3282 Ok(CommitDiff {
3283 files: response
3284 .files
3285 .into_iter()
3286 .map(|file| CommitFile {
3287 path: Path::new(&file.path).into(),
3288 old_text: file.old_text,
3289 new_text: file.new_text,
3290 })
3291 .collect(),
3292 })
3293 }
3294 }
3295 })
3296 }
3297
3298 fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
3299 Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
3300 }
3301
3302 pub fn stage_entries(
3303 &self,
3304 entries: Vec<RepoPath>,
3305 cx: &mut Context<Self>,
3306 ) -> Task<anyhow::Result<()>> {
3307 if entries.is_empty() {
3308 return Task::ready(Ok(()));
3309 }
3310 let id = self.id;
3311
3312 let mut save_futures = Vec::new();
3313 if let Some(buffer_store) = self.buffer_store(cx) {
3314 buffer_store.update(cx, |buffer_store, cx| {
3315 for path in &entries {
3316 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3317 continue;
3318 };
3319 if let Some(buffer) = buffer_store.get_by_path(&project_path, cx) {
3320 if buffer
3321 .read(cx)
3322 .file()
3323 .map_or(false, |file| file.disk_state().exists())
3324 {
3325 save_futures.push(buffer_store.save_buffer(buffer, cx));
3326 }
3327 }
3328 }
3329 })
3330 }
3331
3332 cx.spawn(async move |this, cx| {
3333 for save_future in save_futures {
3334 save_future.await?;
3335 }
3336
3337 this.update(cx, |this, _| {
3338 this.send_job(None, move |git_repo, _cx| async move {
3339 match git_repo {
3340 RepositoryState::Local {
3341 backend,
3342 environment,
3343 ..
3344 } => backend.stage_paths(entries, environment.clone()).await,
3345 RepositoryState::Remote { project_id, client } => {
3346 client
3347 .request(proto::Stage {
3348 project_id: project_id.0,
3349 repository_id: id.to_proto(),
3350 paths: entries
3351 .into_iter()
3352 .map(|repo_path| repo_path.as_ref().to_proto())
3353 .collect(),
3354 })
3355 .await
3356 .context("sending stage request")?;
3357
3358 Ok(())
3359 }
3360 }
3361 })
3362 })?
3363 .await??;
3364
3365 Ok(())
3366 })
3367 }
3368
3369 pub fn unstage_entries(
3370 &self,
3371 entries: Vec<RepoPath>,
3372 cx: &mut Context<Self>,
3373 ) -> Task<anyhow::Result<()>> {
3374 if entries.is_empty() {
3375 return Task::ready(Ok(()));
3376 }
3377 let id = self.id;
3378
3379 let mut save_futures = Vec::new();
3380 if let Some(buffer_store) = self.buffer_store(cx) {
3381 buffer_store.update(cx, |buffer_store, cx| {
3382 for path in &entries {
3383 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3384 continue;
3385 };
3386 if let Some(buffer) = buffer_store.get_by_path(&project_path, cx) {
3387 if buffer
3388 .read(cx)
3389 .file()
3390 .map_or(false, |file| file.disk_state().exists())
3391 {
3392 save_futures.push(buffer_store.save_buffer(buffer, cx));
3393 }
3394 }
3395 }
3396 })
3397 }
3398
3399 cx.spawn(async move |this, cx| {
3400 for save_future in save_futures {
3401 save_future.await?;
3402 }
3403
3404 this.update(cx, |this, _| {
3405 this.send_job(None, move |git_repo, _cx| async move {
3406 match git_repo {
3407 RepositoryState::Local {
3408 backend,
3409 environment,
3410 ..
3411 } => backend.unstage_paths(entries, environment).await,
3412 RepositoryState::Remote { project_id, client } => {
3413 client
3414 .request(proto::Unstage {
3415 project_id: project_id.0,
3416 repository_id: id.to_proto(),
3417 paths: entries
3418 .into_iter()
3419 .map(|repo_path| repo_path.as_ref().to_proto())
3420 .collect(),
3421 })
3422 .await
3423 .context("sending unstage request")?;
3424
3425 Ok(())
3426 }
3427 }
3428 })
3429 })?
3430 .await??;
3431
3432 Ok(())
3433 })
3434 }
3435
3436 pub fn stage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3437 let to_stage = self
3438 .cached_status()
3439 .filter(|entry| !entry.status.staging().is_fully_staged())
3440 .map(|entry| entry.repo_path.clone())
3441 .collect();
3442 self.stage_entries(to_stage, cx)
3443 }
3444
3445 pub fn unstage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3446 let to_unstage = self
3447 .cached_status()
3448 .filter(|entry| entry.status.staging().has_staged())
3449 .map(|entry| entry.repo_path.clone())
3450 .collect();
3451 self.unstage_entries(to_unstage, cx)
3452 }
3453
3454 pub fn commit(
3455 &mut self,
3456 message: SharedString,
3457 name_and_email: Option<(SharedString, SharedString)>,
3458 options: CommitOptions,
3459 _cx: &mut App,
3460 ) -> oneshot::Receiver<Result<()>> {
3461 let id = self.id;
3462
3463 self.send_job(Some("git commit".into()), move |git_repo, _cx| async move {
3464 match git_repo {
3465 RepositoryState::Local {
3466 backend,
3467 environment,
3468 ..
3469 } => {
3470 backend
3471 .commit(message, name_and_email, options, environment)
3472 .await
3473 }
3474 RepositoryState::Remote { project_id, client } => {
3475 let (name, email) = name_and_email.unzip();
3476 client
3477 .request(proto::Commit {
3478 project_id: project_id.0,
3479 repository_id: id.to_proto(),
3480 message: String::from(message),
3481 name: name.map(String::from),
3482 email: email.map(String::from),
3483 options: Some(proto::commit::CommitOptions {
3484 amend: options.amend,
3485 }),
3486 })
3487 .await
3488 .context("sending commit request")?;
3489
3490 Ok(())
3491 }
3492 }
3493 })
3494 }
3495
3496 pub fn fetch(
3497 &mut self,
3498 askpass: AskPassDelegate,
3499 _cx: &mut App,
3500 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3501 let askpass_delegates = self.askpass_delegates.clone();
3502 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3503 let id = self.id;
3504
3505 self.send_job(Some("git fetch".into()), move |git_repo, cx| async move {
3506 match git_repo {
3507 RepositoryState::Local {
3508 backend,
3509 environment,
3510 ..
3511 } => backend.fetch(askpass, environment, cx).await,
3512 RepositoryState::Remote { project_id, client } => {
3513 askpass_delegates.lock().insert(askpass_id, askpass);
3514 let _defer = util::defer(|| {
3515 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3516 debug_assert!(askpass_delegate.is_some());
3517 });
3518
3519 let response = client
3520 .request(proto::Fetch {
3521 project_id: project_id.0,
3522 repository_id: id.to_proto(),
3523 askpass_id,
3524 })
3525 .await
3526 .context("sending fetch request")?;
3527
3528 Ok(RemoteCommandOutput {
3529 stdout: response.stdout,
3530 stderr: response.stderr,
3531 })
3532 }
3533 }
3534 })
3535 }
3536
3537 pub fn push(
3538 &mut self,
3539 branch: SharedString,
3540 remote: SharedString,
3541 options: Option<PushOptions>,
3542 askpass: AskPassDelegate,
3543 cx: &mut Context<Self>,
3544 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3545 let askpass_delegates = self.askpass_delegates.clone();
3546 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3547 let id = self.id;
3548
3549 let args = options
3550 .map(|option| match option {
3551 PushOptions::SetUpstream => " --set-upstream",
3552 PushOptions::Force => " --force-with-lease",
3553 })
3554 .unwrap_or("");
3555
3556 let updates_tx = self
3557 .git_store()
3558 .and_then(|git_store| match &git_store.read(cx).state {
3559 GitStoreState::Local { downstream, .. } => downstream
3560 .as_ref()
3561 .map(|downstream| downstream.updates_tx.clone()),
3562 _ => None,
3563 });
3564
3565 let this = cx.weak_entity();
3566 self.send_job(
3567 Some(format!("git push {} {} {}", args, branch, remote).into()),
3568 move |git_repo, mut cx| async move {
3569 match git_repo {
3570 RepositoryState::Local {
3571 backend,
3572 environment,
3573 ..
3574 } => {
3575 let result = backend
3576 .push(
3577 branch.to_string(),
3578 remote.to_string(),
3579 options,
3580 askpass,
3581 environment.clone(),
3582 cx.clone(),
3583 )
3584 .await;
3585 if result.is_ok() {
3586 let branches = backend.branches().await?;
3587 let branch = branches.into_iter().find(|branch| branch.is_head);
3588 log::info!("head branch after scan is {branch:?}");
3589 let snapshot = this.update(&mut cx, |this, cx| {
3590 this.snapshot.branch = branch;
3591 let snapshot = this.snapshot.clone();
3592 cx.emit(RepositoryEvent::Updated { full_scan: false });
3593 snapshot
3594 })?;
3595 if let Some(updates_tx) = updates_tx {
3596 updates_tx
3597 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
3598 .ok();
3599 }
3600 }
3601 result
3602 }
3603 RepositoryState::Remote { project_id, client } => {
3604 askpass_delegates.lock().insert(askpass_id, askpass);
3605 let _defer = util::defer(|| {
3606 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3607 debug_assert!(askpass_delegate.is_some());
3608 });
3609 let response = client
3610 .request(proto::Push {
3611 project_id: project_id.0,
3612 repository_id: id.to_proto(),
3613 askpass_id,
3614 branch_name: branch.to_string(),
3615 remote_name: remote.to_string(),
3616 options: options.map(|options| match options {
3617 PushOptions::Force => proto::push::PushOptions::Force,
3618 PushOptions::SetUpstream => {
3619 proto::push::PushOptions::SetUpstream
3620 }
3621 }
3622 as i32),
3623 })
3624 .await
3625 .context("sending push request")?;
3626
3627 Ok(RemoteCommandOutput {
3628 stdout: response.stdout,
3629 stderr: response.stderr,
3630 })
3631 }
3632 }
3633 },
3634 )
3635 }
3636
3637 pub fn pull(
3638 &mut self,
3639 branch: SharedString,
3640 remote: SharedString,
3641 askpass: AskPassDelegate,
3642 _cx: &mut App,
3643 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3644 let askpass_delegates = self.askpass_delegates.clone();
3645 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3646 let id = self.id;
3647
3648 self.send_job(
3649 Some(format!("git pull {} {}", remote, branch).into()),
3650 move |git_repo, cx| async move {
3651 match git_repo {
3652 RepositoryState::Local {
3653 backend,
3654 environment,
3655 ..
3656 } => {
3657 backend
3658 .pull(
3659 branch.to_string(),
3660 remote.to_string(),
3661 askpass,
3662 environment.clone(),
3663 cx,
3664 )
3665 .await
3666 }
3667 RepositoryState::Remote { project_id, client } => {
3668 askpass_delegates.lock().insert(askpass_id, askpass);
3669 let _defer = util::defer(|| {
3670 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3671 debug_assert!(askpass_delegate.is_some());
3672 });
3673 let response = client
3674 .request(proto::Pull {
3675 project_id: project_id.0,
3676 repository_id: id.to_proto(),
3677 askpass_id,
3678 branch_name: branch.to_string(),
3679 remote_name: remote.to_string(),
3680 })
3681 .await
3682 .context("sending pull request")?;
3683
3684 Ok(RemoteCommandOutput {
3685 stdout: response.stdout,
3686 stderr: response.stderr,
3687 })
3688 }
3689 }
3690 },
3691 )
3692 }
3693
3694 fn spawn_set_index_text_job(
3695 &mut self,
3696 path: RepoPath,
3697 content: Option<String>,
3698 hunk_staging_operation_count: Option<usize>,
3699 cx: &mut Context<Self>,
3700 ) -> oneshot::Receiver<anyhow::Result<()>> {
3701 let id = self.id;
3702 let this = cx.weak_entity();
3703 let git_store = self.git_store.clone();
3704 self.send_keyed_job(
3705 Some(GitJobKey::WriteIndex(path.clone())),
3706 None,
3707 move |git_repo, mut cx| async move {
3708 log::debug!("start updating index text for buffer {}", path.display());
3709 match git_repo {
3710 RepositoryState::Local {
3711 backend,
3712 environment,
3713 ..
3714 } => {
3715 backend
3716 .set_index_text(path.clone(), content, environment.clone())
3717 .await?;
3718 }
3719 RepositoryState::Remote { project_id, client } => {
3720 client
3721 .request(proto::SetIndexText {
3722 project_id: project_id.0,
3723 repository_id: id.to_proto(),
3724 path: path.as_ref().to_proto(),
3725 text: content,
3726 })
3727 .await?;
3728 }
3729 }
3730 log::debug!("finish updating index text for buffer {}", path.display());
3731
3732 if let Some(hunk_staging_operation_count) = hunk_staging_operation_count {
3733 let project_path = this
3734 .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
3735 .ok()
3736 .flatten();
3737 git_store.update(&mut cx, |git_store, cx| {
3738 let buffer_id = git_store
3739 .buffer_store
3740 .read(cx)
3741 .get_by_path(&project_path?, cx)?
3742 .read(cx)
3743 .remote_id();
3744 let diff_state = git_store.diffs.get(&buffer_id)?;
3745 diff_state.update(cx, |diff_state, _| {
3746 diff_state.hunk_staging_operation_count_as_of_write =
3747 hunk_staging_operation_count;
3748 });
3749 Some(())
3750 })?;
3751 }
3752 Ok(())
3753 },
3754 )
3755 }
3756
3757 pub fn get_remotes(
3758 &mut self,
3759 branch_name: Option<String>,
3760 ) -> oneshot::Receiver<Result<Vec<Remote>>> {
3761 let id = self.id;
3762 self.send_job(None, move |repo, _cx| async move {
3763 match repo {
3764 RepositoryState::Local { backend, .. } => backend.get_remotes(branch_name).await,
3765 RepositoryState::Remote { project_id, client } => {
3766 let response = client
3767 .request(proto::GetRemotes {
3768 project_id: project_id.0,
3769 repository_id: id.to_proto(),
3770 branch_name,
3771 })
3772 .await?;
3773
3774 let remotes = response
3775 .remotes
3776 .into_iter()
3777 .map(|remotes| git::repository::Remote {
3778 name: remotes.name.into(),
3779 })
3780 .collect();
3781
3782 Ok(remotes)
3783 }
3784 }
3785 })
3786 }
3787
3788 pub fn branches(&mut self) -> oneshot::Receiver<Result<Vec<Branch>>> {
3789 let id = self.id;
3790 self.send_job(None, move |repo, _| async move {
3791 match repo {
3792 RepositoryState::Local { backend, .. } => backend.branches().await,
3793 RepositoryState::Remote { project_id, client } => {
3794 let response = client
3795 .request(proto::GitGetBranches {
3796 project_id: project_id.0,
3797 repository_id: id.to_proto(),
3798 })
3799 .await?;
3800
3801 let branches = response
3802 .branches
3803 .into_iter()
3804 .map(|branch| proto_to_branch(&branch))
3805 .collect();
3806
3807 Ok(branches)
3808 }
3809 }
3810 })
3811 }
3812
3813 pub fn diff(&mut self, diff_type: DiffType, _cx: &App) -> oneshot::Receiver<Result<String>> {
3814 let id = self.id;
3815 self.send_job(None, move |repo, _cx| async move {
3816 match repo {
3817 RepositoryState::Local { backend, .. } => backend.diff(diff_type).await,
3818 RepositoryState::Remote { project_id, client } => {
3819 let response = client
3820 .request(proto::GitDiff {
3821 project_id: project_id.0,
3822 repository_id: id.to_proto(),
3823 diff_type: match diff_type {
3824 DiffType::HeadToIndex => {
3825 proto::git_diff::DiffType::HeadToIndex.into()
3826 }
3827 DiffType::HeadToWorktree => {
3828 proto::git_diff::DiffType::HeadToWorktree.into()
3829 }
3830 },
3831 })
3832 .await?;
3833
3834 Ok(response.diff)
3835 }
3836 }
3837 })
3838 }
3839
3840 pub fn create_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3841 let id = self.id;
3842 self.send_job(
3843 Some(format!("git switch -c {branch_name}").into()),
3844 move |repo, _cx| async move {
3845 match repo {
3846 RepositoryState::Local { backend, .. } => {
3847 backend.create_branch(branch_name).await
3848 }
3849 RepositoryState::Remote { project_id, client } => {
3850 client
3851 .request(proto::GitCreateBranch {
3852 project_id: project_id.0,
3853 repository_id: id.to_proto(),
3854 branch_name,
3855 })
3856 .await?;
3857
3858 Ok(())
3859 }
3860 }
3861 },
3862 )
3863 }
3864
3865 pub fn change_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3866 let id = self.id;
3867 self.send_job(
3868 Some(format!("git switch {branch_name}").into()),
3869 move |repo, _cx| async move {
3870 match repo {
3871 RepositoryState::Local { backend, .. } => {
3872 backend.change_branch(branch_name).await
3873 }
3874 RepositoryState::Remote { project_id, client } => {
3875 client
3876 .request(proto::GitChangeBranch {
3877 project_id: project_id.0,
3878 repository_id: id.to_proto(),
3879 branch_name,
3880 })
3881 .await?;
3882
3883 Ok(())
3884 }
3885 }
3886 },
3887 )
3888 }
3889
3890 pub fn check_for_pushed_commits(&mut self) -> oneshot::Receiver<Result<Vec<SharedString>>> {
3891 let id = self.id;
3892 self.send_job(None, move |repo, _cx| async move {
3893 match repo {
3894 RepositoryState::Local { backend, .. } => backend.check_for_pushed_commit().await,
3895 RepositoryState::Remote { project_id, client } => {
3896 let response = client
3897 .request(proto::CheckForPushedCommits {
3898 project_id: project_id.0,
3899 repository_id: id.to_proto(),
3900 })
3901 .await?;
3902
3903 let branches = response.pushed_to.into_iter().map(Into::into).collect();
3904
3905 Ok(branches)
3906 }
3907 }
3908 })
3909 }
3910
3911 pub fn checkpoint(&mut self) -> oneshot::Receiver<Result<GitRepositoryCheckpoint>> {
3912 self.send_job(None, |repo, _cx| async move {
3913 match repo {
3914 RepositoryState::Local { backend, .. } => backend.checkpoint().await,
3915 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
3916 }
3917 })
3918 }
3919
3920 pub fn restore_checkpoint(
3921 &mut self,
3922 checkpoint: GitRepositoryCheckpoint,
3923 ) -> oneshot::Receiver<Result<()>> {
3924 self.send_job(None, move |repo, _cx| async move {
3925 match repo {
3926 RepositoryState::Local { backend, .. } => {
3927 backend.restore_checkpoint(checkpoint).await
3928 }
3929 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
3930 }
3931 })
3932 }
3933
3934 pub(crate) fn apply_remote_update(
3935 &mut self,
3936 update: proto::UpdateRepository,
3937 cx: &mut Context<Self>,
3938 ) -> Result<()> {
3939 let conflicted_paths = TreeSet::from_ordered_entries(
3940 update
3941 .current_merge_conflicts
3942 .into_iter()
3943 .map(|path| RepoPath(Path::new(&path).into())),
3944 );
3945 self.snapshot.branch = update.branch_summary.as_ref().map(proto_to_branch);
3946 self.snapshot.head_commit = update
3947 .head_commit_details
3948 .as_ref()
3949 .map(proto_to_commit_details);
3950
3951 self.snapshot.merge.conflicted_paths = conflicted_paths;
3952
3953 let edits = update
3954 .removed_statuses
3955 .into_iter()
3956 .map(|path| sum_tree::Edit::Remove(PathKey(FromProto::from_proto(path))))
3957 .chain(
3958 update
3959 .updated_statuses
3960 .into_iter()
3961 .filter_map(|updated_status| {
3962 Some(sum_tree::Edit::Insert(updated_status.try_into().log_err()?))
3963 }),
3964 )
3965 .collect::<Vec<_>>();
3966 self.snapshot.statuses_by_path.edit(edits, &());
3967 if update.is_last_update {
3968 self.snapshot.scan_id = update.scan_id;
3969 }
3970 cx.emit(RepositoryEvent::Updated { full_scan: true });
3971 Ok(())
3972 }
3973
3974 pub fn compare_checkpoints(
3975 &mut self,
3976 left: GitRepositoryCheckpoint,
3977 right: GitRepositoryCheckpoint,
3978 ) -> oneshot::Receiver<Result<bool>> {
3979 self.send_job(None, move |repo, _cx| async move {
3980 match repo {
3981 RepositoryState::Local { backend, .. } => {
3982 backend.compare_checkpoints(left, right).await
3983 }
3984 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
3985 }
3986 })
3987 }
3988
3989 pub fn diff_checkpoints(
3990 &mut self,
3991 base_checkpoint: GitRepositoryCheckpoint,
3992 target_checkpoint: GitRepositoryCheckpoint,
3993 ) -> oneshot::Receiver<Result<String>> {
3994 self.send_job(None, move |repo, _cx| async move {
3995 match repo {
3996 RepositoryState::Local { backend, .. } => {
3997 backend
3998 .diff_checkpoints(base_checkpoint, target_checkpoint)
3999 .await
4000 }
4001 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
4002 }
4003 })
4004 }
4005
4006 fn schedule_scan(
4007 &mut self,
4008 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4009 cx: &mut Context<Self>,
4010 ) {
4011 let this = cx.weak_entity();
4012 let _ = self.send_keyed_job(
4013 Some(GitJobKey::ReloadGitState),
4014 None,
4015 |state, mut cx| async move {
4016 log::debug!("run scheduled git status scan");
4017
4018 let Some(this) = this.upgrade() else {
4019 return Ok(());
4020 };
4021 let RepositoryState::Local { backend, .. } = state else {
4022 bail!("not a local repository")
4023 };
4024 let (snapshot, events) = this
4025 .read_with(&mut cx, |this, _| {
4026 compute_snapshot(
4027 this.id,
4028 this.work_directory_abs_path.clone(),
4029 this.snapshot.clone(),
4030 backend.clone(),
4031 )
4032 })?
4033 .await?;
4034 this.update(&mut cx, |this, cx| {
4035 this.snapshot = snapshot.clone();
4036 for event in events {
4037 cx.emit(event);
4038 }
4039 })?;
4040 if let Some(updates_tx) = updates_tx {
4041 updates_tx
4042 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
4043 .ok();
4044 }
4045 Ok(())
4046 },
4047 );
4048 }
4049
4050 fn spawn_local_git_worker(
4051 work_directory_abs_path: Arc<Path>,
4052 dot_git_abs_path: Arc<Path>,
4053 _repository_dir_abs_path: Arc<Path>,
4054 _common_dir_abs_path: Arc<Path>,
4055 project_environment: WeakEntity<ProjectEnvironment>,
4056 fs: Arc<dyn Fs>,
4057 cx: &mut Context<Self>,
4058 ) -> mpsc::UnboundedSender<GitJob> {
4059 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4060
4061 cx.spawn(async move |_, cx| {
4062 let environment = project_environment
4063 .upgrade()
4064 .context("missing project environment")?
4065 .update(cx, |project_environment, cx| {
4066 project_environment.get_directory_environment(work_directory_abs_path.clone(), cx)
4067 })?
4068 .await
4069 .unwrap_or_else(|| {
4070 log::error!("failed to get working directory environment for repository {work_directory_abs_path:?}");
4071 HashMap::default()
4072 });
4073 let backend = cx
4074 .background_spawn(async move {
4075 fs.open_repo(&dot_git_abs_path)
4076 .with_context(|| format!("opening repository at {dot_git_abs_path:?}"))
4077 })
4078 .await?;
4079
4080 if let Some(git_hosting_provider_registry) =
4081 cx.update(|cx| GitHostingProviderRegistry::try_global(cx))?
4082 {
4083 git_hosting_providers::register_additional_providers(
4084 git_hosting_provider_registry,
4085 backend.clone(),
4086 );
4087 }
4088
4089 let state = RepositoryState::Local {
4090 backend,
4091 environment: Arc::new(environment),
4092 };
4093 let mut jobs = VecDeque::new();
4094 loop {
4095 while let Ok(Some(next_job)) = job_rx.try_next() {
4096 jobs.push_back(next_job);
4097 }
4098
4099 if let Some(job) = jobs.pop_front() {
4100 if let Some(current_key) = &job.key {
4101 if jobs
4102 .iter()
4103 .any(|other_job| other_job.key.as_ref() == Some(current_key))
4104 {
4105 continue;
4106 }
4107 }
4108 (job.job)(state.clone(), cx).await;
4109 } else if let Some(job) = job_rx.next().await {
4110 jobs.push_back(job);
4111 } else {
4112 break;
4113 }
4114 }
4115 anyhow::Ok(())
4116 })
4117 .detach_and_log_err(cx);
4118
4119 job_tx
4120 }
4121
4122 fn spawn_remote_git_worker(
4123 project_id: ProjectId,
4124 client: AnyProtoClient,
4125 cx: &mut Context<Self>,
4126 ) -> mpsc::UnboundedSender<GitJob> {
4127 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4128
4129 cx.spawn(async move |_, cx| {
4130 let state = RepositoryState::Remote { project_id, client };
4131 let mut jobs = VecDeque::new();
4132 loop {
4133 while let Ok(Some(next_job)) = job_rx.try_next() {
4134 jobs.push_back(next_job);
4135 }
4136
4137 if let Some(job) = jobs.pop_front() {
4138 if let Some(current_key) = &job.key {
4139 if jobs
4140 .iter()
4141 .any(|other_job| other_job.key.as_ref() == Some(current_key))
4142 {
4143 continue;
4144 }
4145 }
4146 (job.job)(state.clone(), cx).await;
4147 } else if let Some(job) = job_rx.next().await {
4148 jobs.push_back(job);
4149 } else {
4150 break;
4151 }
4152 }
4153 anyhow::Ok(())
4154 })
4155 .detach_and_log_err(cx);
4156
4157 job_tx
4158 }
4159
4160 fn load_staged_text(
4161 &mut self,
4162 buffer_id: BufferId,
4163 repo_path: RepoPath,
4164 cx: &App,
4165 ) -> Task<Result<Option<String>>> {
4166 let rx = self.send_job(None, move |state, _| async move {
4167 match state {
4168 RepositoryState::Local { backend, .. } => {
4169 anyhow::Ok(backend.load_index_text(repo_path).await)
4170 }
4171 RepositoryState::Remote { project_id, client } => {
4172 let response = client
4173 .request(proto::OpenUnstagedDiff {
4174 project_id: project_id.to_proto(),
4175 buffer_id: buffer_id.to_proto(),
4176 })
4177 .await?;
4178 Ok(response.staged_text)
4179 }
4180 }
4181 });
4182 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4183 }
4184
4185 fn load_committed_text(
4186 &mut self,
4187 buffer_id: BufferId,
4188 repo_path: RepoPath,
4189 cx: &App,
4190 ) -> Task<Result<DiffBasesChange>> {
4191 let rx = self.send_job(None, move |state, _| async move {
4192 match state {
4193 RepositoryState::Local { backend, .. } => {
4194 let committed_text = backend.load_committed_text(repo_path.clone()).await;
4195 let staged_text = backend.load_index_text(repo_path).await;
4196 let diff_bases_change = if committed_text == staged_text {
4197 DiffBasesChange::SetBoth(committed_text)
4198 } else {
4199 DiffBasesChange::SetEach {
4200 index: staged_text,
4201 head: committed_text,
4202 }
4203 };
4204 anyhow::Ok(diff_bases_change)
4205 }
4206 RepositoryState::Remote { project_id, client } => {
4207 use proto::open_uncommitted_diff_response::Mode;
4208
4209 let response = client
4210 .request(proto::OpenUncommittedDiff {
4211 project_id: project_id.to_proto(),
4212 buffer_id: buffer_id.to_proto(),
4213 })
4214 .await?;
4215 let mode = Mode::from_i32(response.mode).context("Invalid mode")?;
4216 let bases = match mode {
4217 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
4218 Mode::IndexAndHead => DiffBasesChange::SetEach {
4219 head: response.committed_text,
4220 index: response.staged_text,
4221 },
4222 };
4223 Ok(bases)
4224 }
4225 }
4226 });
4227
4228 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4229 }
4230
4231 fn paths_changed(
4232 &mut self,
4233 paths: Vec<RepoPath>,
4234 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4235 cx: &mut Context<Self>,
4236 ) {
4237 self.paths_needing_status_update.extend(paths);
4238
4239 let this = cx.weak_entity();
4240 let _ = self.send_keyed_job(
4241 Some(GitJobKey::RefreshStatuses),
4242 None,
4243 |state, mut cx| async move {
4244 let (prev_snapshot, mut changed_paths) = this.update(&mut cx, |this, _| {
4245 (
4246 this.snapshot.clone(),
4247 mem::take(&mut this.paths_needing_status_update),
4248 )
4249 })?;
4250 let RepositoryState::Local { backend, .. } = state else {
4251 bail!("not a local repository")
4252 };
4253
4254 let paths = changed_paths.iter().cloned().collect::<Vec<_>>();
4255 let statuses = backend.status(&paths).await?;
4256
4257 let changed_path_statuses = cx
4258 .background_spawn(async move {
4259 let mut changed_path_statuses = Vec::new();
4260 let prev_statuses = prev_snapshot.statuses_by_path.clone();
4261 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4262
4263 for (repo_path, status) in &*statuses.entries {
4264 changed_paths.remove(repo_path);
4265 if cursor.seek_forward(&PathTarget::Path(repo_path), Bias::Left, &()) {
4266 if cursor.item().is_some_and(|entry| entry.status == *status) {
4267 continue;
4268 }
4269 }
4270
4271 changed_path_statuses.push(Edit::Insert(StatusEntry {
4272 repo_path: repo_path.clone(),
4273 status: *status,
4274 }));
4275 }
4276 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4277 for path in changed_paths.into_iter() {
4278 if cursor.seek_forward(&PathTarget::Path(&path), Bias::Left, &()) {
4279 changed_path_statuses.push(Edit::Remove(PathKey(path.0)));
4280 }
4281 }
4282 changed_path_statuses
4283 })
4284 .await;
4285
4286 this.update(&mut cx, |this, cx| {
4287 if !changed_path_statuses.is_empty() {
4288 this.snapshot
4289 .statuses_by_path
4290 .edit(changed_path_statuses, &());
4291 this.snapshot.scan_id += 1;
4292 if let Some(updates_tx) = updates_tx {
4293 updates_tx
4294 .unbounded_send(DownstreamUpdate::UpdateRepository(
4295 this.snapshot.clone(),
4296 ))
4297 .ok();
4298 }
4299 }
4300 cx.emit(RepositoryEvent::Updated { full_scan: false });
4301 })
4302 },
4303 );
4304 }
4305
4306 /// currently running git command and when it started
4307 pub fn current_job(&self) -> Option<JobInfo> {
4308 self.active_jobs.values().next().cloned()
4309 }
4310
4311 pub fn barrier(&mut self) -> oneshot::Receiver<()> {
4312 self.send_job(None, |_, _| async {})
4313 }
4314}
4315
4316fn get_permalink_in_rust_registry_src(
4317 provider_registry: Arc<GitHostingProviderRegistry>,
4318 path: PathBuf,
4319 selection: Range<u32>,
4320) -> Result<url::Url> {
4321 #[derive(Deserialize)]
4322 struct CargoVcsGit {
4323 sha1: String,
4324 }
4325
4326 #[derive(Deserialize)]
4327 struct CargoVcsInfo {
4328 git: CargoVcsGit,
4329 path_in_vcs: String,
4330 }
4331
4332 #[derive(Deserialize)]
4333 struct CargoPackage {
4334 repository: String,
4335 }
4336
4337 #[derive(Deserialize)]
4338 struct CargoToml {
4339 package: CargoPackage,
4340 }
4341
4342 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
4343 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
4344 Some((dir, json))
4345 }) else {
4346 bail!("No .cargo_vcs_info.json found in parent directories")
4347 };
4348 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
4349 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
4350 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
4351 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
4352 .context("parsing package.repository field of manifest")?;
4353 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
4354 let permalink = provider.build_permalink(
4355 remote,
4356 BuildPermalinkParams {
4357 sha: &cargo_vcs_info.git.sha1,
4358 path: &path.to_string_lossy(),
4359 selection: Some(selection),
4360 },
4361 );
4362 Ok(permalink)
4363}
4364
4365fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
4366 let Some(blame) = blame else {
4367 return proto::BlameBufferResponse {
4368 blame_response: None,
4369 };
4370 };
4371
4372 let entries = blame
4373 .entries
4374 .into_iter()
4375 .map(|entry| proto::BlameEntry {
4376 sha: entry.sha.as_bytes().into(),
4377 start_line: entry.range.start,
4378 end_line: entry.range.end,
4379 original_line_number: entry.original_line_number,
4380 author: entry.author.clone(),
4381 author_mail: entry.author_mail.clone(),
4382 author_time: entry.author_time,
4383 author_tz: entry.author_tz.clone(),
4384 committer: entry.committer_name.clone(),
4385 committer_mail: entry.committer_email.clone(),
4386 committer_time: entry.committer_time,
4387 committer_tz: entry.committer_tz.clone(),
4388 summary: entry.summary.clone(),
4389 previous: entry.previous.clone(),
4390 filename: entry.filename.clone(),
4391 })
4392 .collect::<Vec<_>>();
4393
4394 let messages = blame
4395 .messages
4396 .into_iter()
4397 .map(|(oid, message)| proto::CommitMessage {
4398 oid: oid.as_bytes().into(),
4399 message,
4400 })
4401 .collect::<Vec<_>>();
4402
4403 proto::BlameBufferResponse {
4404 blame_response: Some(proto::blame_buffer_response::BlameResponse {
4405 entries,
4406 messages,
4407 remote_url: blame.remote_url,
4408 }),
4409 }
4410}
4411
4412fn deserialize_blame_buffer_response(
4413 response: proto::BlameBufferResponse,
4414) -> Option<git::blame::Blame> {
4415 let response = response.blame_response?;
4416 let entries = response
4417 .entries
4418 .into_iter()
4419 .filter_map(|entry| {
4420 Some(git::blame::BlameEntry {
4421 sha: git::Oid::from_bytes(&entry.sha).ok()?,
4422 range: entry.start_line..entry.end_line,
4423 original_line_number: entry.original_line_number,
4424 committer_name: entry.committer,
4425 committer_time: entry.committer_time,
4426 committer_tz: entry.committer_tz,
4427 committer_email: entry.committer_mail,
4428 author: entry.author,
4429 author_mail: entry.author_mail,
4430 author_time: entry.author_time,
4431 author_tz: entry.author_tz,
4432 summary: entry.summary,
4433 previous: entry.previous,
4434 filename: entry.filename,
4435 })
4436 })
4437 .collect::<Vec<_>>();
4438
4439 let messages = response
4440 .messages
4441 .into_iter()
4442 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
4443 .collect::<HashMap<_, _>>();
4444
4445 Some(Blame {
4446 entries,
4447 messages,
4448 remote_url: response.remote_url,
4449 })
4450}
4451
4452fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
4453 proto::Branch {
4454 is_head: branch.is_head,
4455 ref_name: branch.ref_name.to_string(),
4456 unix_timestamp: branch
4457 .most_recent_commit
4458 .as_ref()
4459 .map(|commit| commit.commit_timestamp as u64),
4460 upstream: branch.upstream.as_ref().map(|upstream| proto::GitUpstream {
4461 ref_name: upstream.ref_name.to_string(),
4462 tracking: upstream
4463 .tracking
4464 .status()
4465 .map(|upstream| proto::UpstreamTracking {
4466 ahead: upstream.ahead as u64,
4467 behind: upstream.behind as u64,
4468 }),
4469 }),
4470 most_recent_commit: branch
4471 .most_recent_commit
4472 .as_ref()
4473 .map(|commit| proto::CommitSummary {
4474 sha: commit.sha.to_string(),
4475 subject: commit.subject.to_string(),
4476 commit_timestamp: commit.commit_timestamp,
4477 }),
4478 }
4479}
4480
4481fn proto_to_branch(proto: &proto::Branch) -> git::repository::Branch {
4482 git::repository::Branch {
4483 is_head: proto.is_head,
4484 ref_name: proto.ref_name.clone().into(),
4485 upstream: proto
4486 .upstream
4487 .as_ref()
4488 .map(|upstream| git::repository::Upstream {
4489 ref_name: upstream.ref_name.to_string().into(),
4490 tracking: upstream
4491 .tracking
4492 .as_ref()
4493 .map(|tracking| {
4494 git::repository::UpstreamTracking::Tracked(UpstreamTrackingStatus {
4495 ahead: tracking.ahead as u32,
4496 behind: tracking.behind as u32,
4497 })
4498 })
4499 .unwrap_or(git::repository::UpstreamTracking::Gone),
4500 }),
4501 most_recent_commit: proto.most_recent_commit.as_ref().map(|commit| {
4502 git::repository::CommitSummary {
4503 sha: commit.sha.to_string().into(),
4504 subject: commit.subject.to_string().into(),
4505 commit_timestamp: commit.commit_timestamp,
4506 has_parent: true,
4507 }
4508 }),
4509 }
4510}
4511
4512fn commit_details_to_proto(commit: &CommitDetails) -> proto::GitCommitDetails {
4513 proto::GitCommitDetails {
4514 sha: commit.sha.to_string(),
4515 message: commit.message.to_string(),
4516 commit_timestamp: commit.commit_timestamp,
4517 author_email: commit.author_email.to_string(),
4518 author_name: commit.author_name.to_string(),
4519 }
4520}
4521
4522fn proto_to_commit_details(proto: &proto::GitCommitDetails) -> CommitDetails {
4523 CommitDetails {
4524 sha: proto.sha.clone().into(),
4525 message: proto.message.clone().into(),
4526 commit_timestamp: proto.commit_timestamp,
4527 author_email: proto.author_email.clone().into(),
4528 author_name: proto.author_name.clone().into(),
4529 }
4530}
4531
4532async fn compute_snapshot(
4533 id: RepositoryId,
4534 work_directory_abs_path: Arc<Path>,
4535 prev_snapshot: RepositorySnapshot,
4536 backend: Arc<dyn GitRepository>,
4537) -> Result<(RepositorySnapshot, Vec<RepositoryEvent>)> {
4538 let mut events = Vec::new();
4539 let branches = backend.branches().await?;
4540 let branch = branches.into_iter().find(|branch| branch.is_head);
4541 let statuses = backend.status(&[WORK_DIRECTORY_REPO_PATH.clone()]).await?;
4542 let statuses_by_path = SumTree::from_iter(
4543 statuses
4544 .entries
4545 .iter()
4546 .map(|(repo_path, status)| StatusEntry {
4547 repo_path: repo_path.clone(),
4548 status: *status,
4549 }),
4550 &(),
4551 );
4552 let (merge_details, merge_heads_changed) =
4553 MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?;
4554 log::debug!("new merge details (changed={merge_heads_changed:?}): {merge_details:?}");
4555
4556 if merge_heads_changed
4557 || branch != prev_snapshot.branch
4558 || statuses_by_path != prev_snapshot.statuses_by_path
4559 {
4560 events.push(RepositoryEvent::Updated { full_scan: true });
4561 }
4562
4563 // Cache merge conflict paths so they don't change from staging/unstaging,
4564 // until the merge heads change (at commit time, etc.).
4565 if merge_heads_changed {
4566 events.push(RepositoryEvent::MergeHeadsChanged);
4567 }
4568
4569 // Useful when branch is None in detached head state
4570 let head_commit = match backend.head_sha().await {
4571 Some(head_sha) => backend.show(head_sha).await.log_err(),
4572 None => None,
4573 };
4574
4575 let snapshot = RepositorySnapshot {
4576 id,
4577 statuses_by_path,
4578 work_directory_abs_path,
4579 scan_id: prev_snapshot.scan_id + 1,
4580 branch,
4581 head_commit,
4582 merge: merge_details,
4583 };
4584
4585 Ok((snapshot, events))
4586}
4587
4588fn status_from_proto(
4589 simple_status: i32,
4590 status: Option<proto::GitFileStatus>,
4591) -> anyhow::Result<FileStatus> {
4592 use proto::git_file_status::Variant;
4593
4594 let Some(variant) = status.and_then(|status| status.variant) else {
4595 let code = proto::GitStatus::from_i32(simple_status)
4596 .with_context(|| format!("Invalid git status code: {simple_status}"))?;
4597 let result = match code {
4598 proto::GitStatus::Added => TrackedStatus {
4599 worktree_status: StatusCode::Added,
4600 index_status: StatusCode::Unmodified,
4601 }
4602 .into(),
4603 proto::GitStatus::Modified => TrackedStatus {
4604 worktree_status: StatusCode::Modified,
4605 index_status: StatusCode::Unmodified,
4606 }
4607 .into(),
4608 proto::GitStatus::Conflict => UnmergedStatus {
4609 first_head: UnmergedStatusCode::Updated,
4610 second_head: UnmergedStatusCode::Updated,
4611 }
4612 .into(),
4613 proto::GitStatus::Deleted => TrackedStatus {
4614 worktree_status: StatusCode::Deleted,
4615 index_status: StatusCode::Unmodified,
4616 }
4617 .into(),
4618 _ => anyhow::bail!("Invalid code for simple status: {simple_status}"),
4619 };
4620 return Ok(result);
4621 };
4622
4623 let result = match variant {
4624 Variant::Untracked(_) => FileStatus::Untracked,
4625 Variant::Ignored(_) => FileStatus::Ignored,
4626 Variant::Unmerged(unmerged) => {
4627 let [first_head, second_head] =
4628 [unmerged.first_head, unmerged.second_head].map(|head| {
4629 let code = proto::GitStatus::from_i32(head)
4630 .with_context(|| format!("Invalid git status code: {head}"))?;
4631 let result = match code {
4632 proto::GitStatus::Added => UnmergedStatusCode::Added,
4633 proto::GitStatus::Updated => UnmergedStatusCode::Updated,
4634 proto::GitStatus::Deleted => UnmergedStatusCode::Deleted,
4635 _ => anyhow::bail!("Invalid code for unmerged status: {code:?}"),
4636 };
4637 Ok(result)
4638 });
4639 let [first_head, second_head] = [first_head?, second_head?];
4640 UnmergedStatus {
4641 first_head,
4642 second_head,
4643 }
4644 .into()
4645 }
4646 Variant::Tracked(tracked) => {
4647 let [index_status, worktree_status] = [tracked.index_status, tracked.worktree_status]
4648 .map(|status| {
4649 let code = proto::GitStatus::from_i32(status)
4650 .with_context(|| format!("Invalid git status code: {status}"))?;
4651 let result = match code {
4652 proto::GitStatus::Modified => StatusCode::Modified,
4653 proto::GitStatus::TypeChanged => StatusCode::TypeChanged,
4654 proto::GitStatus::Added => StatusCode::Added,
4655 proto::GitStatus::Deleted => StatusCode::Deleted,
4656 proto::GitStatus::Renamed => StatusCode::Renamed,
4657 proto::GitStatus::Copied => StatusCode::Copied,
4658 proto::GitStatus::Unmodified => StatusCode::Unmodified,
4659 _ => anyhow::bail!("Invalid code for tracked status: {code:?}"),
4660 };
4661 Ok(result)
4662 });
4663 let [index_status, worktree_status] = [index_status?, worktree_status?];
4664 TrackedStatus {
4665 index_status,
4666 worktree_status,
4667 }
4668 .into()
4669 }
4670 };
4671 Ok(result)
4672}
4673
4674fn status_to_proto(status: FileStatus) -> proto::GitFileStatus {
4675 use proto::git_file_status::{Tracked, Unmerged, Variant};
4676
4677 let variant = match status {
4678 FileStatus::Untracked => Variant::Untracked(Default::default()),
4679 FileStatus::Ignored => Variant::Ignored(Default::default()),
4680 FileStatus::Unmerged(UnmergedStatus {
4681 first_head,
4682 second_head,
4683 }) => Variant::Unmerged(Unmerged {
4684 first_head: unmerged_status_to_proto(first_head),
4685 second_head: unmerged_status_to_proto(second_head),
4686 }),
4687 FileStatus::Tracked(TrackedStatus {
4688 index_status,
4689 worktree_status,
4690 }) => Variant::Tracked(Tracked {
4691 index_status: tracked_status_to_proto(index_status),
4692 worktree_status: tracked_status_to_proto(worktree_status),
4693 }),
4694 };
4695 proto::GitFileStatus {
4696 variant: Some(variant),
4697 }
4698}
4699
4700fn unmerged_status_to_proto(code: UnmergedStatusCode) -> i32 {
4701 match code {
4702 UnmergedStatusCode::Added => proto::GitStatus::Added as _,
4703 UnmergedStatusCode::Deleted => proto::GitStatus::Deleted as _,
4704 UnmergedStatusCode::Updated => proto::GitStatus::Updated as _,
4705 }
4706}
4707
4708fn tracked_status_to_proto(code: StatusCode) -> i32 {
4709 match code {
4710 StatusCode::Added => proto::GitStatus::Added as _,
4711 StatusCode::Deleted => proto::GitStatus::Deleted as _,
4712 StatusCode::Modified => proto::GitStatus::Modified as _,
4713 StatusCode::Renamed => proto::GitStatus::Renamed as _,
4714 StatusCode::TypeChanged => proto::GitStatus::TypeChanged as _,
4715 StatusCode::Copied => proto::GitStatus::Copied as _,
4716 StatusCode::Unmodified => proto::GitStatus::Unmodified as _,
4717 }
4718}