1mod conflict_set;
2pub mod git_traversal;
3
4use crate::{
5 ProjectEnvironment, ProjectItem, ProjectPath,
6 buffer_store::{BufferStore, BufferStoreEvent},
7 worktree_store::{WorktreeStore, WorktreeStoreEvent},
8};
9use anyhow::{Context as _, Result, anyhow, bail};
10use askpass::AskPassDelegate;
11use buffer_diff::{BufferDiff, BufferDiffEvent};
12use client::ProjectId;
13use collections::HashMap;
14pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, ConflictSetUpdate};
15use fs::Fs;
16use futures::{
17 FutureExt, StreamExt as _,
18 channel::{mpsc, oneshot},
19 future::{self, Shared, try_join_all},
20};
21use git::{
22 BuildPermalinkParams, GitHostingProviderRegistry, WORK_DIRECTORY_REPO_PATH,
23 blame::Blame,
24 parse_git_remote_url,
25 repository::{
26 Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, DiffType, GitRepository,
27 GitRepositoryCheckpoint, PushOptions, Remote, RemoteCommandOutput, RepoPath, ResetMode,
28 UpstreamTrackingStatus,
29 },
30 status::{
31 FileStatus, GitSummary, StatusCode, TrackedStatus, UnmergedStatus, UnmergedStatusCode,
32 },
33};
34use gpui::{
35 App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
36 WeakEntity,
37};
38use language::{
39 Buffer, BufferEvent, Language, LanguageRegistry,
40 proto::{deserialize_version, serialize_version},
41};
42use parking_lot::Mutex;
43use postage::stream::Stream as _;
44use rpc::{
45 AnyProtoClient, TypedEnvelope,
46 proto::{self, FromProto, SSH_PROJECT_ID, ToProto, git_reset, split_repository_update},
47};
48use serde::Deserialize;
49use std::{
50 cmp::Ordering,
51 collections::{BTreeSet, VecDeque},
52 future::Future,
53 mem,
54 ops::Range,
55 path::{Path, PathBuf},
56 sync::{
57 Arc,
58 atomic::{self, AtomicU64},
59 },
60 time::Instant,
61};
62use sum_tree::{Edit, SumTree, TreeSet};
63use text::{Bias, BufferId};
64use util::{ResultExt, debug_panic, post_inc};
65use worktree::{
66 File, PathKey, PathProgress, PathSummary, PathTarget, UpdatedGitRepositoriesSet,
67 UpdatedGitRepository, Worktree,
68};
69
70pub struct GitStore {
71 state: GitStoreState,
72 buffer_store: Entity<BufferStore>,
73 worktree_store: Entity<WorktreeStore>,
74 repositories: HashMap<RepositoryId, Entity<Repository>>,
75 active_repo_id: Option<RepositoryId>,
76 #[allow(clippy::type_complexity)]
77 loading_diffs:
78 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
79 diffs: HashMap<BufferId, Entity<BufferGitState>>,
80 shared_diffs: HashMap<proto::PeerId, HashMap<BufferId, SharedDiffs>>,
81 _subscriptions: Vec<Subscription>,
82}
83
84#[derive(Default)]
85struct SharedDiffs {
86 unstaged: Option<Entity<BufferDiff>>,
87 uncommitted: Option<Entity<BufferDiff>>,
88}
89
90struct BufferGitState {
91 unstaged_diff: Option<WeakEntity<BufferDiff>>,
92 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
93 conflict_set: Option<WeakEntity<ConflictSet>>,
94 recalculate_diff_task: Option<Task<Result<()>>>,
95 reparse_conflict_markers_task: Option<Task<Result<()>>>,
96 language: Option<Arc<Language>>,
97 language_registry: Option<Arc<LanguageRegistry>>,
98 conflict_updated_futures: Vec<oneshot::Sender<()>>,
99 recalculating_tx: postage::watch::Sender<bool>,
100
101 /// These operation counts are used to ensure that head and index text
102 /// values read from the git repository are up-to-date with any hunk staging
103 /// operations that have been performed on the BufferDiff.
104 ///
105 /// The operation count is incremented immediately when the user initiates a
106 /// hunk stage/unstage operation. Then, upon finishing writing the new index
107 /// text do disk, the `operation count as of write` is updated to reflect
108 /// the operation count that prompted the write.
109 hunk_staging_operation_count: usize,
110 hunk_staging_operation_count_as_of_write: usize,
111
112 head_text: Option<Arc<String>>,
113 index_text: Option<Arc<String>>,
114 head_changed: bool,
115 index_changed: bool,
116 language_changed: bool,
117}
118
119#[derive(Clone, Debug)]
120enum DiffBasesChange {
121 SetIndex(Option<String>),
122 SetHead(Option<String>),
123 SetEach {
124 index: Option<String>,
125 head: Option<String>,
126 },
127 SetBoth(Option<String>),
128}
129
130#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
131enum DiffKind {
132 Unstaged,
133 Uncommitted,
134}
135
136enum GitStoreState {
137 Local {
138 next_repository_id: Arc<AtomicU64>,
139 downstream: Option<LocalDownstreamState>,
140 project_environment: Entity<ProjectEnvironment>,
141 fs: Arc<dyn Fs>,
142 },
143 Ssh {
144 upstream_client: AnyProtoClient,
145 upstream_project_id: ProjectId,
146 downstream: Option<(AnyProtoClient, ProjectId)>,
147 },
148 Remote {
149 upstream_client: AnyProtoClient,
150 upstream_project_id: ProjectId,
151 },
152}
153
154enum DownstreamUpdate {
155 UpdateRepository(RepositorySnapshot),
156 RemoveRepository(RepositoryId),
157}
158
159struct LocalDownstreamState {
160 client: AnyProtoClient,
161 project_id: ProjectId,
162 updates_tx: mpsc::UnboundedSender<DownstreamUpdate>,
163 _task: Task<Result<()>>,
164}
165
166#[derive(Clone)]
167pub struct GitStoreCheckpoint {
168 checkpoints_by_work_dir_abs_path: HashMap<Arc<Path>, GitRepositoryCheckpoint>,
169}
170
171#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct StatusEntry {
173 pub repo_path: RepoPath,
174 pub status: FileStatus,
175}
176
177impl StatusEntry {
178 fn to_proto(&self) -> proto::StatusEntry {
179 let simple_status = match self.status {
180 FileStatus::Ignored | FileStatus::Untracked => proto::GitStatus::Added as i32,
181 FileStatus::Unmerged { .. } => proto::GitStatus::Conflict as i32,
182 FileStatus::Tracked(TrackedStatus {
183 index_status,
184 worktree_status,
185 }) => tracked_status_to_proto(if worktree_status != StatusCode::Unmodified {
186 worktree_status
187 } else {
188 index_status
189 }),
190 };
191
192 proto::StatusEntry {
193 repo_path: self.repo_path.as_ref().to_proto(),
194 simple_status,
195 status: Some(status_to_proto(self.status)),
196 }
197 }
198}
199
200impl TryFrom<proto::StatusEntry> for StatusEntry {
201 type Error = anyhow::Error;
202
203 fn try_from(value: proto::StatusEntry) -> Result<Self, Self::Error> {
204 let repo_path = RepoPath(Arc::<Path>::from_proto(value.repo_path));
205 let status = status_from_proto(value.simple_status, value.status)?;
206 Ok(Self { repo_path, status })
207 }
208}
209
210impl sum_tree::Item for StatusEntry {
211 type Summary = PathSummary<GitSummary>;
212
213 fn summary(&self, _: &<Self::Summary as sum_tree::Summary>::Context) -> Self::Summary {
214 PathSummary {
215 max_path: self.repo_path.0.clone(),
216 item_summary: self.status.summary(),
217 }
218 }
219}
220
221impl sum_tree::KeyedItem for StatusEntry {
222 type Key = PathKey;
223
224 fn key(&self) -> Self::Key {
225 PathKey(self.repo_path.0.clone())
226 }
227}
228
229#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
230pub struct RepositoryId(pub u64);
231
232#[derive(Clone, Debug, Default, PartialEq, Eq)]
233pub struct MergeDetails {
234 pub conflicted_paths: TreeSet<RepoPath>,
235 pub message: Option<SharedString>,
236 pub apply_head: Option<CommitDetails>,
237 pub cherry_pick_head: Option<CommitDetails>,
238 pub merge_heads: Vec<CommitDetails>,
239 pub rebase_head: Option<CommitDetails>,
240 pub revert_head: Option<CommitDetails>,
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244pub struct RepositorySnapshot {
245 pub id: RepositoryId,
246 pub statuses_by_path: SumTree<StatusEntry>,
247 pub work_directory_abs_path: Arc<Path>,
248 pub branch: Option<Branch>,
249 pub head_commit: Option<CommitDetails>,
250 pub scan_id: u64,
251 pub merge: MergeDetails,
252}
253
254type JobId = u64;
255
256#[derive(Clone, Debug, PartialEq, Eq)]
257pub struct JobInfo {
258 pub start: Instant,
259 pub message: SharedString,
260}
261
262pub struct Repository {
263 this: WeakEntity<Self>,
264 snapshot: RepositorySnapshot,
265 commit_message_buffer: Option<Entity<Buffer>>,
266 git_store: WeakEntity<GitStore>,
267 // For a local repository, holds paths that have had worktree events since the last status scan completed,
268 // and that should be examined during the next status scan.
269 paths_needing_status_update: BTreeSet<RepoPath>,
270 job_sender: mpsc::UnboundedSender<GitJob>,
271 active_jobs: HashMap<JobId, JobInfo>,
272 job_id: JobId,
273 askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
274 latest_askpass_id: u64,
275}
276
277impl std::ops::Deref for Repository {
278 type Target = RepositorySnapshot;
279
280 fn deref(&self) -> &Self::Target {
281 &self.snapshot
282 }
283}
284
285#[derive(Clone)]
286pub enum RepositoryState {
287 Local {
288 backend: Arc<dyn GitRepository>,
289 environment: Arc<HashMap<String, String>>,
290 },
291 Remote {
292 project_id: ProjectId,
293 client: AnyProtoClient,
294 },
295}
296
297#[derive(Clone, Debug)]
298pub enum RepositoryEvent {
299 Updated { full_scan: bool },
300 MergeHeadsChanged,
301}
302
303#[derive(Clone, Debug)]
304pub struct JobsUpdated;
305
306#[derive(Debug)]
307pub enum GitStoreEvent {
308 ActiveRepositoryChanged(Option<RepositoryId>),
309 RepositoryUpdated(RepositoryId, RepositoryEvent, bool),
310 RepositoryAdded(RepositoryId),
311 RepositoryRemoved(RepositoryId),
312 IndexWriteError(anyhow::Error),
313 JobsUpdated,
314 ConflictsUpdated,
315}
316
317impl EventEmitter<RepositoryEvent> for Repository {}
318impl EventEmitter<JobsUpdated> for Repository {}
319impl EventEmitter<GitStoreEvent> for GitStore {}
320
321pub struct GitJob {
322 job: Box<dyn FnOnce(RepositoryState, &mut AsyncApp) -> Task<()>>,
323 key: Option<GitJobKey>,
324}
325
326#[derive(PartialEq, Eq)]
327enum GitJobKey {
328 WriteIndex(RepoPath),
329 ReloadBufferDiffBases,
330 RefreshStatuses,
331 ReloadGitState,
332}
333
334impl GitStore {
335 pub fn local(
336 worktree_store: &Entity<WorktreeStore>,
337 buffer_store: Entity<BufferStore>,
338 environment: Entity<ProjectEnvironment>,
339 fs: Arc<dyn Fs>,
340 cx: &mut Context<Self>,
341 ) -> Self {
342 Self::new(
343 worktree_store.clone(),
344 buffer_store,
345 GitStoreState::Local {
346 next_repository_id: Arc::new(AtomicU64::new(1)),
347 downstream: None,
348 project_environment: environment,
349 fs,
350 },
351 cx,
352 )
353 }
354
355 pub fn remote(
356 worktree_store: &Entity<WorktreeStore>,
357 buffer_store: Entity<BufferStore>,
358 upstream_client: AnyProtoClient,
359 project_id: ProjectId,
360 cx: &mut Context<Self>,
361 ) -> Self {
362 Self::new(
363 worktree_store.clone(),
364 buffer_store,
365 GitStoreState::Remote {
366 upstream_client,
367 upstream_project_id: project_id,
368 },
369 cx,
370 )
371 }
372
373 pub fn ssh(
374 worktree_store: &Entity<WorktreeStore>,
375 buffer_store: Entity<BufferStore>,
376 upstream_client: AnyProtoClient,
377 cx: &mut Context<Self>,
378 ) -> Self {
379 Self::new(
380 worktree_store.clone(),
381 buffer_store,
382 GitStoreState::Ssh {
383 upstream_client,
384 upstream_project_id: ProjectId(SSH_PROJECT_ID),
385 downstream: None,
386 },
387 cx,
388 )
389 }
390
391 fn new(
392 worktree_store: Entity<WorktreeStore>,
393 buffer_store: Entity<BufferStore>,
394 state: GitStoreState,
395 cx: &mut Context<Self>,
396 ) -> Self {
397 let _subscriptions = vec![
398 cx.subscribe(&worktree_store, Self::on_worktree_store_event),
399 cx.subscribe(&buffer_store, Self::on_buffer_store_event),
400 ];
401
402 GitStore {
403 state,
404 buffer_store,
405 worktree_store,
406 repositories: HashMap::default(),
407 active_repo_id: None,
408 _subscriptions,
409 loading_diffs: HashMap::default(),
410 shared_diffs: HashMap::default(),
411 diffs: HashMap::default(),
412 }
413 }
414
415 pub fn init(client: &AnyProtoClient) {
416 client.add_entity_request_handler(Self::handle_get_remotes);
417 client.add_entity_request_handler(Self::handle_get_branches);
418 client.add_entity_request_handler(Self::handle_change_branch);
419 client.add_entity_request_handler(Self::handle_create_branch);
420 client.add_entity_request_handler(Self::handle_git_init);
421 client.add_entity_request_handler(Self::handle_push);
422 client.add_entity_request_handler(Self::handle_pull);
423 client.add_entity_request_handler(Self::handle_fetch);
424 client.add_entity_request_handler(Self::handle_stage);
425 client.add_entity_request_handler(Self::handle_unstage);
426 client.add_entity_request_handler(Self::handle_commit);
427 client.add_entity_request_handler(Self::handle_reset);
428 client.add_entity_request_handler(Self::handle_show);
429 client.add_entity_request_handler(Self::handle_load_commit_diff);
430 client.add_entity_request_handler(Self::handle_checkout_files);
431 client.add_entity_request_handler(Self::handle_open_commit_message_buffer);
432 client.add_entity_request_handler(Self::handle_set_index_text);
433 client.add_entity_request_handler(Self::handle_askpass);
434 client.add_entity_request_handler(Self::handle_check_for_pushed_commits);
435 client.add_entity_request_handler(Self::handle_git_diff);
436 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
437 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
438 client.add_entity_message_handler(Self::handle_update_diff_bases);
439 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
440 client.add_entity_request_handler(Self::handle_blame_buffer);
441 client.add_entity_message_handler(Self::handle_update_repository);
442 client.add_entity_message_handler(Self::handle_remove_repository);
443 }
444
445 pub fn is_local(&self) -> bool {
446 matches!(self.state, GitStoreState::Local { .. })
447 }
448
449 pub fn shared(&mut self, project_id: u64, client: AnyProtoClient, cx: &mut Context<Self>) {
450 match &mut self.state {
451 GitStoreState::Ssh {
452 downstream: downstream_client,
453 ..
454 } => {
455 for repo in self.repositories.values() {
456 let update = repo.read(cx).snapshot.initial_update(project_id);
457 for update in split_repository_update(update) {
458 client.send(update).log_err();
459 }
460 }
461 *downstream_client = Some((client, ProjectId(project_id)));
462 }
463 GitStoreState::Local {
464 downstream: downstream_client,
465 ..
466 } => {
467 let mut snapshots = HashMap::default();
468 let (updates_tx, mut updates_rx) = mpsc::unbounded();
469 for repo in self.repositories.values() {
470 updates_tx
471 .unbounded_send(DownstreamUpdate::UpdateRepository(
472 repo.read(cx).snapshot.clone(),
473 ))
474 .ok();
475 }
476 *downstream_client = Some(LocalDownstreamState {
477 client: client.clone(),
478 project_id: ProjectId(project_id),
479 updates_tx,
480 _task: cx.spawn(async move |this, cx| {
481 cx.background_spawn(async move {
482 while let Some(update) = updates_rx.next().await {
483 match update {
484 DownstreamUpdate::UpdateRepository(snapshot) => {
485 if let Some(old_snapshot) = snapshots.get_mut(&snapshot.id)
486 {
487 let update =
488 snapshot.build_update(old_snapshot, project_id);
489 *old_snapshot = snapshot;
490 for update in split_repository_update(update) {
491 client.send(update)?;
492 }
493 } else {
494 let update = snapshot.initial_update(project_id);
495 for update in split_repository_update(update) {
496 client.send(update)?;
497 }
498 snapshots.insert(snapshot.id, snapshot);
499 }
500 }
501 DownstreamUpdate::RemoveRepository(id) => {
502 client.send(proto::RemoveRepository {
503 project_id,
504 id: id.to_proto(),
505 })?;
506 }
507 }
508 }
509 anyhow::Ok(())
510 })
511 .await
512 .ok();
513 this.update(cx, |this, _| {
514 if let GitStoreState::Local {
515 downstream: downstream_client,
516 ..
517 } = &mut this.state
518 {
519 downstream_client.take();
520 } else {
521 unreachable!("unshared called on remote store");
522 }
523 })
524 }),
525 });
526 }
527 GitStoreState::Remote { .. } => {
528 debug_panic!("shared called on remote store");
529 }
530 }
531 }
532
533 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
534 match &mut self.state {
535 GitStoreState::Local {
536 downstream: downstream_client,
537 ..
538 } => {
539 downstream_client.take();
540 }
541 GitStoreState::Ssh {
542 downstream: downstream_client,
543 ..
544 } => {
545 downstream_client.take();
546 }
547 GitStoreState::Remote { .. } => {
548 debug_panic!("unshared called on remote store");
549 }
550 }
551 self.shared_diffs.clear();
552 }
553
554 pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) {
555 self.shared_diffs.remove(peer_id);
556 }
557
558 pub fn active_repository(&self) -> Option<Entity<Repository>> {
559 self.active_repo_id
560 .as_ref()
561 .map(|id| self.repositories[&id].clone())
562 }
563
564 pub fn open_unstaged_diff(
565 &mut self,
566 buffer: Entity<Buffer>,
567 cx: &mut Context<Self>,
568 ) -> Task<Result<Entity<BufferDiff>>> {
569 let buffer_id = buffer.read(cx).remote_id();
570 if let Some(diff_state) = self.diffs.get(&buffer_id) {
571 if let Some(unstaged_diff) = diff_state
572 .read(cx)
573 .unstaged_diff
574 .as_ref()
575 .and_then(|weak| weak.upgrade())
576 {
577 if let Some(task) =
578 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
579 {
580 return cx.background_executor().spawn(async move {
581 task.await;
582 Ok(unstaged_diff)
583 });
584 }
585 return Task::ready(Ok(unstaged_diff));
586 }
587 }
588
589 let Some((repo, repo_path)) =
590 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
591 else {
592 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
593 };
594
595 let task = self
596 .loading_diffs
597 .entry((buffer_id, DiffKind::Unstaged))
598 .or_insert_with(|| {
599 let staged_text = repo.update(cx, |repo, cx| {
600 repo.load_staged_text(buffer_id, repo_path, cx)
601 });
602 cx.spawn(async move |this, cx| {
603 Self::open_diff_internal(
604 this,
605 DiffKind::Unstaged,
606 staged_text.await.map(DiffBasesChange::SetIndex),
607 buffer,
608 cx,
609 )
610 .await
611 .map_err(Arc::new)
612 })
613 .shared()
614 })
615 .clone();
616
617 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
618 }
619
620 pub fn open_uncommitted_diff(
621 &mut self,
622 buffer: Entity<Buffer>,
623 cx: &mut Context<Self>,
624 ) -> Task<Result<Entity<BufferDiff>>> {
625 let buffer_id = buffer.read(cx).remote_id();
626
627 if let Some(diff_state) = self.diffs.get(&buffer_id) {
628 if let Some(uncommitted_diff) = diff_state
629 .read(cx)
630 .uncommitted_diff
631 .as_ref()
632 .and_then(|weak| weak.upgrade())
633 {
634 if let Some(task) =
635 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
636 {
637 return cx.background_executor().spawn(async move {
638 task.await;
639 Ok(uncommitted_diff)
640 });
641 }
642 return Task::ready(Ok(uncommitted_diff));
643 }
644 }
645
646 let Some((repo, repo_path)) =
647 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
648 else {
649 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
650 };
651
652 let task = self
653 .loading_diffs
654 .entry((buffer_id, DiffKind::Uncommitted))
655 .or_insert_with(|| {
656 let changes = repo.update(cx, |repo, cx| {
657 repo.load_committed_text(buffer_id, repo_path, cx)
658 });
659
660 cx.spawn(async move |this, cx| {
661 Self::open_diff_internal(this, DiffKind::Uncommitted, changes.await, buffer, cx)
662 .await
663 .map_err(Arc::new)
664 })
665 .shared()
666 })
667 .clone();
668
669 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
670 }
671
672 async fn open_diff_internal(
673 this: WeakEntity<Self>,
674 kind: DiffKind,
675 texts: Result<DiffBasesChange>,
676 buffer_entity: Entity<Buffer>,
677 cx: &mut AsyncApp,
678 ) -> Result<Entity<BufferDiff>> {
679 let diff_bases_change = match texts {
680 Err(e) => {
681 this.update(cx, |this, cx| {
682 let buffer = buffer_entity.read(cx);
683 let buffer_id = buffer.remote_id();
684 this.loading_diffs.remove(&(buffer_id, kind));
685 })?;
686 return Err(e);
687 }
688 Ok(change) => change,
689 };
690
691 this.update(cx, |this, cx| {
692 let buffer = buffer_entity.read(cx);
693 let buffer_id = buffer.remote_id();
694 let language = buffer.language().cloned();
695 let language_registry = buffer.language_registry();
696 let text_snapshot = buffer.text_snapshot();
697 this.loading_diffs.remove(&(buffer_id, kind));
698
699 let git_store = cx.weak_entity();
700 let diff_state = this
701 .diffs
702 .entry(buffer_id)
703 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
704
705 let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
706
707 cx.subscribe(&diff, Self::on_buffer_diff_event).detach();
708 diff_state.update(cx, |diff_state, cx| {
709 diff_state.language = language;
710 diff_state.language_registry = language_registry;
711
712 match kind {
713 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
714 DiffKind::Uncommitted => {
715 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
716 diff
717 } else {
718 let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
719 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
720 unstaged_diff
721 };
722
723 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
724 diff_state.uncommitted_diff = Some(diff.downgrade())
725 }
726 }
727
728 diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
729 let rx = diff_state.wait_for_recalculation();
730
731 anyhow::Ok(async move {
732 if let Some(rx) = rx {
733 rx.await;
734 }
735 Ok(diff)
736 })
737 })
738 })??
739 .await
740 }
741
742 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
743 let diff_state = self.diffs.get(&buffer_id)?;
744 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
745 }
746
747 pub fn get_uncommitted_diff(
748 &self,
749 buffer_id: BufferId,
750 cx: &App,
751 ) -> Option<Entity<BufferDiff>> {
752 let diff_state = self.diffs.get(&buffer_id)?;
753 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
754 }
755
756 pub fn open_conflict_set(
757 &mut self,
758 buffer: Entity<Buffer>,
759 cx: &mut Context<Self>,
760 ) -> Entity<ConflictSet> {
761 log::debug!("open conflict set");
762 let buffer_id = buffer.read(cx).remote_id();
763
764 if let Some(git_state) = self.diffs.get(&buffer_id) {
765 if let Some(conflict_set) = git_state
766 .read(cx)
767 .conflict_set
768 .as_ref()
769 .and_then(|weak| weak.upgrade())
770 {
771 let conflict_set = conflict_set.clone();
772 let buffer_snapshot = buffer.read(cx).text_snapshot();
773
774 git_state.update(cx, |state, cx| {
775 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
776 });
777
778 return conflict_set;
779 }
780 }
781
782 let is_unmerged = self
783 .repository_and_path_for_buffer_id(buffer_id, cx)
784 .map_or(false, |(repo, path)| {
785 repo.read(cx)
786 .snapshot
787 .merge
788 .conflicted_paths
789 .contains(&path)
790 });
791 let git_store = cx.weak_entity();
792 let buffer_git_state = self
793 .diffs
794 .entry(buffer_id)
795 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
796 let conflict_set = cx.new(|cx| ConflictSet::new(buffer_id, is_unmerged, cx));
797
798 self._subscriptions
799 .push(cx.subscribe(&conflict_set, |_, _, _, cx| {
800 cx.emit(GitStoreEvent::ConflictsUpdated);
801 }));
802
803 buffer_git_state.update(cx, |state, cx| {
804 state.conflict_set = Some(conflict_set.downgrade());
805 let buffer_snapshot = buffer.read(cx).text_snapshot();
806 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
807 });
808
809 conflict_set
810 }
811
812 pub fn project_path_git_status(
813 &self,
814 project_path: &ProjectPath,
815 cx: &App,
816 ) -> Option<FileStatus> {
817 let (repo, repo_path) = self.repository_and_path_for_project_path(project_path, cx)?;
818 Some(repo.read(cx).status_for_path(&repo_path)?.status)
819 }
820
821 pub fn checkpoint(&self, cx: &mut App) -> Task<Result<GitStoreCheckpoint>> {
822 let mut work_directory_abs_paths = Vec::new();
823 let mut checkpoints = Vec::new();
824 for repository in self.repositories.values() {
825 repository.update(cx, |repository, _| {
826 work_directory_abs_paths.push(repository.snapshot.work_directory_abs_path.clone());
827 checkpoints.push(repository.checkpoint().map(|checkpoint| checkpoint?));
828 });
829 }
830
831 cx.background_executor().spawn(async move {
832 let checkpoints = future::try_join_all(checkpoints).await?;
833 Ok(GitStoreCheckpoint {
834 checkpoints_by_work_dir_abs_path: work_directory_abs_paths
835 .into_iter()
836 .zip(checkpoints)
837 .collect(),
838 })
839 })
840 }
841
842 pub fn restore_checkpoint(
843 &self,
844 checkpoint: GitStoreCheckpoint,
845 cx: &mut App,
846 ) -> Task<Result<()>> {
847 let repositories_by_work_dir_abs_path = self
848 .repositories
849 .values()
850 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
851 .collect::<HashMap<_, _>>();
852
853 let mut tasks = Vec::new();
854 for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
855 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path) {
856 let restore = repository.update(cx, |repository, _| {
857 repository.restore_checkpoint(checkpoint)
858 });
859 tasks.push(async move { restore.await? });
860 }
861 }
862 cx.background_spawn(async move {
863 future::try_join_all(tasks).await?;
864 Ok(())
865 })
866 }
867
868 /// Compares two checkpoints, returning true if they are equal.
869 pub fn compare_checkpoints(
870 &self,
871 left: GitStoreCheckpoint,
872 mut right: GitStoreCheckpoint,
873 cx: &mut App,
874 ) -> Task<Result<bool>> {
875 let repositories_by_work_dir_abs_path = self
876 .repositories
877 .values()
878 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
879 .collect::<HashMap<_, _>>();
880
881 let mut tasks = Vec::new();
882 for (work_dir_abs_path, left_checkpoint) in left.checkpoints_by_work_dir_abs_path {
883 if let Some(right_checkpoint) = right
884 .checkpoints_by_work_dir_abs_path
885 .remove(&work_dir_abs_path)
886 {
887 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path)
888 {
889 let compare = repository.update(cx, |repository, _| {
890 repository.compare_checkpoints(left_checkpoint, right_checkpoint)
891 });
892
893 tasks.push(async move { compare.await? });
894 }
895 } else {
896 return Task::ready(Ok(false));
897 }
898 }
899 cx.background_spawn(async move {
900 Ok(future::try_join_all(tasks)
901 .await?
902 .into_iter()
903 .all(|result| result))
904 })
905 }
906
907 /// Blames a buffer.
908 pub fn blame_buffer(
909 &self,
910 buffer: &Entity<Buffer>,
911 version: Option<clock::Global>,
912 cx: &mut App,
913 ) -> Task<Result<Option<Blame>>> {
914 let buffer = buffer.read(cx);
915 let Some((repo, repo_path)) =
916 self.repository_and_path_for_buffer_id(buffer.remote_id(), cx)
917 else {
918 return Task::ready(Err(anyhow!("failed to find a git repository for buffer")));
919 };
920 let content = match &version {
921 Some(version) => buffer.rope_for_version(version).clone(),
922 None => buffer.as_rope().clone(),
923 };
924 let version = version.unwrap_or(buffer.version());
925 let buffer_id = buffer.remote_id();
926
927 let rx = repo.update(cx, |repo, _| {
928 repo.send_job(None, move |state, _| async move {
929 match state {
930 RepositoryState::Local { backend, .. } => backend
931 .blame(repo_path.clone(), content)
932 .await
933 .with_context(|| format!("Failed to blame {:?}", repo_path.0))
934 .map(Some),
935 RepositoryState::Remote { project_id, client } => {
936 let response = client
937 .request(proto::BlameBuffer {
938 project_id: project_id.to_proto(),
939 buffer_id: buffer_id.into(),
940 version: serialize_version(&version),
941 })
942 .await?;
943 Ok(deserialize_blame_buffer_response(response))
944 }
945 }
946 })
947 });
948
949 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
950 }
951
952 pub fn get_permalink_to_line(
953 &self,
954 buffer: &Entity<Buffer>,
955 selection: Range<u32>,
956 cx: &mut App,
957 ) -> Task<Result<url::Url>> {
958 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
959 return Task::ready(Err(anyhow!("buffer has no file")));
960 };
961
962 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
963 &(file.worktree.read(cx).id(), file.path.clone()).into(),
964 cx,
965 ) else {
966 // If we're not in a Git repo, check whether this is a Rust source
967 // file in the Cargo registry (presumably opened with go-to-definition
968 // from a normal Rust file). If so, we can put together a permalink
969 // using crate metadata.
970 if buffer
971 .read(cx)
972 .language()
973 .is_none_or(|lang| lang.name() != "Rust".into())
974 {
975 return Task::ready(Err(anyhow!("no permalink available")));
976 }
977 let Some(file_path) = file.worktree.read(cx).absolutize(&file.path).ok() else {
978 return Task::ready(Err(anyhow!("no permalink available")));
979 };
980 return cx.spawn(async move |cx| {
981 let provider_registry = cx.update(GitHostingProviderRegistry::default_global)?;
982 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
983 .map_err(|_| anyhow!("no permalink available"))
984 });
985
986 // TODO remote case
987 };
988
989 let buffer_id = buffer.read(cx).remote_id();
990 let branch = repo.read(cx).branch.clone();
991 let remote = branch
992 .as_ref()
993 .and_then(|b| b.upstream.as_ref())
994 .and_then(|b| b.remote_name())
995 .unwrap_or("origin")
996 .to_string();
997
998 let rx = repo.update(cx, |repo, _| {
999 repo.send_job(None, move |state, cx| async move {
1000 match state {
1001 RepositoryState::Local { backend, .. } => {
1002 let origin_url = backend
1003 .remote_url(&remote)
1004 .ok_or_else(|| anyhow!("remote \"{remote}\" not found"))?;
1005
1006 let sha = backend
1007 .head_sha()
1008 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1009
1010 let provider_registry =
1011 cx.update(GitHostingProviderRegistry::default_global)?;
1012
1013 let (provider, remote) =
1014 parse_git_remote_url(provider_registry, &origin_url)
1015 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1016
1017 let path = repo_path
1018 .to_str()
1019 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1020
1021 Ok(provider.build_permalink(
1022 remote,
1023 BuildPermalinkParams {
1024 sha: &sha,
1025 path,
1026 selection: Some(selection),
1027 },
1028 ))
1029 }
1030 RepositoryState::Remote { project_id, client } => {
1031 let response = client
1032 .request(proto::GetPermalinkToLine {
1033 project_id: project_id.to_proto(),
1034 buffer_id: buffer_id.into(),
1035 selection: Some(proto::Range {
1036 start: selection.start as u64,
1037 end: selection.end as u64,
1038 }),
1039 })
1040 .await?;
1041
1042 url::Url::parse(&response.permalink).context("failed to parse permalink")
1043 }
1044 }
1045 })
1046 });
1047 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1048 }
1049
1050 fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> {
1051 match &self.state {
1052 GitStoreState::Local {
1053 downstream: downstream_client,
1054 ..
1055 } => downstream_client
1056 .as_ref()
1057 .map(|state| (state.client.clone(), state.project_id)),
1058 GitStoreState::Ssh {
1059 downstream: downstream_client,
1060 ..
1061 } => downstream_client.clone(),
1062 GitStoreState::Remote { .. } => None,
1063 }
1064 }
1065
1066 fn upstream_client(&self) -> Option<AnyProtoClient> {
1067 match &self.state {
1068 GitStoreState::Local { .. } => None,
1069 GitStoreState::Ssh {
1070 upstream_client, ..
1071 }
1072 | GitStoreState::Remote {
1073 upstream_client, ..
1074 } => Some(upstream_client.clone()),
1075 }
1076 }
1077
1078 fn on_worktree_store_event(
1079 &mut self,
1080 worktree_store: Entity<WorktreeStore>,
1081 event: &WorktreeStoreEvent,
1082 cx: &mut Context<Self>,
1083 ) {
1084 let GitStoreState::Local {
1085 project_environment,
1086 downstream,
1087 next_repository_id,
1088 fs,
1089 } = &self.state
1090 else {
1091 return;
1092 };
1093
1094 match event {
1095 WorktreeStoreEvent::WorktreeUpdatedEntries(worktree_id, updated_entries) => {
1096 let mut paths_by_git_repo = HashMap::<_, Vec<_>>::default();
1097 for (relative_path, _, _) in updated_entries.iter() {
1098 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
1099 &(*worktree_id, relative_path.clone()).into(),
1100 cx,
1101 ) else {
1102 continue;
1103 };
1104 paths_by_git_repo.entry(repo).or_default().push(repo_path)
1105 }
1106
1107 for (repo, paths) in paths_by_git_repo {
1108 repo.update(cx, |repo, cx| {
1109 repo.paths_changed(
1110 paths,
1111 downstream
1112 .as_ref()
1113 .map(|downstream| downstream.updates_tx.clone()),
1114 cx,
1115 );
1116 });
1117 }
1118 }
1119 WorktreeStoreEvent::WorktreeUpdatedGitRepositories(worktree_id, changed_repos) => {
1120 let Some(worktree) = worktree_store.read(cx).worktree_for_id(*worktree_id, cx)
1121 else {
1122 return;
1123 };
1124 if !worktree.read(cx).is_visible() {
1125 log::debug!(
1126 "not adding repositories for local worktree {:?} because it's not visible",
1127 worktree.read(cx).abs_path()
1128 );
1129 return;
1130 }
1131 self.update_repositories_from_worktree(
1132 project_environment.clone(),
1133 next_repository_id.clone(),
1134 downstream
1135 .as_ref()
1136 .map(|downstream| downstream.updates_tx.clone()),
1137 changed_repos.clone(),
1138 fs.clone(),
1139 cx,
1140 );
1141 self.local_worktree_git_repos_changed(worktree, changed_repos, cx);
1142 }
1143 _ => {}
1144 }
1145 }
1146
1147 fn on_repository_event(
1148 &mut self,
1149 repo: Entity<Repository>,
1150 event: &RepositoryEvent,
1151 cx: &mut Context<Self>,
1152 ) {
1153 let id = repo.read(cx).id;
1154 let merge_conflicts = repo.read(cx).snapshot.merge.conflicted_paths.clone();
1155 for (buffer_id, diff) in self.diffs.iter() {
1156 if let Some((buffer_repo, repo_path)) =
1157 self.repository_and_path_for_buffer_id(*buffer_id, cx)
1158 {
1159 if buffer_repo == repo {
1160 diff.update(cx, |diff, cx| {
1161 if let Some(conflict_set) = &diff.conflict_set {
1162 let conflict_status_changed =
1163 conflict_set.update(cx, |conflict_set, cx| {
1164 let has_conflict = merge_conflicts.contains(&repo_path);
1165 conflict_set.set_has_conflict(has_conflict, cx)
1166 })?;
1167 if conflict_status_changed {
1168 let buffer_store = self.buffer_store.read(cx);
1169 if let Some(buffer) = buffer_store.get(*buffer_id) {
1170 let _ = diff.reparse_conflict_markers(
1171 buffer.read(cx).text_snapshot(),
1172 cx,
1173 );
1174 }
1175 }
1176 }
1177 anyhow::Ok(())
1178 })
1179 .ok();
1180 }
1181 }
1182 }
1183 cx.emit(GitStoreEvent::RepositoryUpdated(
1184 id,
1185 event.clone(),
1186 self.active_repo_id == Some(id),
1187 ))
1188 }
1189
1190 fn on_jobs_updated(&mut self, _: Entity<Repository>, _: &JobsUpdated, cx: &mut Context<Self>) {
1191 cx.emit(GitStoreEvent::JobsUpdated)
1192 }
1193
1194 /// Update our list of repositories and schedule git scans in response to a notification from a worktree,
1195 fn update_repositories_from_worktree(
1196 &mut self,
1197 project_environment: Entity<ProjectEnvironment>,
1198 next_repository_id: Arc<AtomicU64>,
1199 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
1200 updated_git_repositories: UpdatedGitRepositoriesSet,
1201 fs: Arc<dyn Fs>,
1202 cx: &mut Context<Self>,
1203 ) {
1204 let mut removed_ids = Vec::new();
1205 for update in updated_git_repositories.iter() {
1206 if let Some((id, existing)) = self.repositories.iter().find(|(_, repo)| {
1207 let existing_work_directory_abs_path =
1208 repo.read(cx).work_directory_abs_path.clone();
1209 Some(&existing_work_directory_abs_path)
1210 == update.old_work_directory_abs_path.as_ref()
1211 || Some(&existing_work_directory_abs_path)
1212 == update.new_work_directory_abs_path.as_ref()
1213 }) {
1214 if let Some(new_work_directory_abs_path) =
1215 update.new_work_directory_abs_path.clone()
1216 {
1217 existing.update(cx, |existing, cx| {
1218 existing.snapshot.work_directory_abs_path = new_work_directory_abs_path;
1219 existing.schedule_scan(updates_tx.clone(), cx);
1220 });
1221 } else {
1222 removed_ids.push(*id);
1223 }
1224 } else if let UpdatedGitRepository {
1225 new_work_directory_abs_path: Some(work_directory_abs_path),
1226 dot_git_abs_path: Some(dot_git_abs_path),
1227 repository_dir_abs_path: Some(repository_dir_abs_path),
1228 common_dir_abs_path: Some(common_dir_abs_path),
1229 ..
1230 } = update
1231 {
1232 let id = RepositoryId(next_repository_id.fetch_add(1, atomic::Ordering::Release));
1233 let git_store = cx.weak_entity();
1234 let repo = cx.new(|cx| {
1235 let mut repo = Repository::local(
1236 id,
1237 work_directory_abs_path.clone(),
1238 dot_git_abs_path.clone(),
1239 repository_dir_abs_path.clone(),
1240 common_dir_abs_path.clone(),
1241 project_environment.downgrade(),
1242 fs.clone(),
1243 git_store,
1244 cx,
1245 );
1246 repo.schedule_scan(updates_tx.clone(), cx);
1247 repo
1248 });
1249 self._subscriptions
1250 .push(cx.subscribe(&repo, Self::on_repository_event));
1251 self._subscriptions
1252 .push(cx.subscribe(&repo, Self::on_jobs_updated));
1253 self.repositories.insert(id, repo);
1254 cx.emit(GitStoreEvent::RepositoryAdded(id));
1255 self.active_repo_id.get_or_insert_with(|| {
1256 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1257 id
1258 });
1259 }
1260 }
1261
1262 for id in removed_ids {
1263 if self.active_repo_id == Some(id) {
1264 self.active_repo_id = None;
1265 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1266 }
1267 self.repositories.remove(&id);
1268 if let Some(updates_tx) = updates_tx.as_ref() {
1269 updates_tx
1270 .unbounded_send(DownstreamUpdate::RemoveRepository(id))
1271 .ok();
1272 }
1273 }
1274 }
1275
1276 fn on_buffer_store_event(
1277 &mut self,
1278 _: Entity<BufferStore>,
1279 event: &BufferStoreEvent,
1280 cx: &mut Context<Self>,
1281 ) {
1282 match event {
1283 BufferStoreEvent::BufferAdded(buffer) => {
1284 cx.subscribe(&buffer, |this, buffer, event, cx| {
1285 if let BufferEvent::LanguageChanged = event {
1286 let buffer_id = buffer.read(cx).remote_id();
1287 if let Some(diff_state) = this.diffs.get(&buffer_id) {
1288 diff_state.update(cx, |diff_state, cx| {
1289 diff_state.buffer_language_changed(buffer, cx);
1290 });
1291 }
1292 }
1293 })
1294 .detach();
1295 }
1296 BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => {
1297 if let Some(diffs) = self.shared_diffs.get_mut(peer_id) {
1298 diffs.remove(buffer_id);
1299 }
1300 }
1301 BufferStoreEvent::BufferDropped(buffer_id) => {
1302 self.diffs.remove(&buffer_id);
1303 for diffs in self.shared_diffs.values_mut() {
1304 diffs.remove(buffer_id);
1305 }
1306 }
1307
1308 _ => {}
1309 }
1310 }
1311
1312 pub fn recalculate_buffer_diffs(
1313 &mut self,
1314 buffers: Vec<Entity<Buffer>>,
1315 cx: &mut Context<Self>,
1316 ) -> impl Future<Output = ()> + use<> {
1317 let mut futures = Vec::new();
1318 for buffer in buffers {
1319 if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
1320 let buffer = buffer.read(cx).text_snapshot();
1321 diff_state.update(cx, |diff_state, cx| {
1322 diff_state.recalculate_diffs(buffer.clone(), cx);
1323 futures.extend(diff_state.wait_for_recalculation().map(FutureExt::boxed));
1324 });
1325 futures.push(diff_state.update(cx, |diff_state, cx| {
1326 diff_state
1327 .reparse_conflict_markers(buffer, cx)
1328 .map(|_| {})
1329 .boxed()
1330 }));
1331 }
1332 }
1333 async move {
1334 futures::future::join_all(futures).await;
1335 }
1336 }
1337
1338 fn on_buffer_diff_event(
1339 &mut self,
1340 diff: Entity<buffer_diff::BufferDiff>,
1341 event: &BufferDiffEvent,
1342 cx: &mut Context<Self>,
1343 ) {
1344 if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
1345 let buffer_id = diff.read(cx).buffer_id;
1346 if let Some(diff_state) = self.diffs.get(&buffer_id) {
1347 let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
1348 diff_state.hunk_staging_operation_count += 1;
1349 diff_state.hunk_staging_operation_count
1350 });
1351 if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
1352 let recv = repo.update(cx, |repo, cx| {
1353 log::debug!("hunks changed for {}", path.display());
1354 repo.spawn_set_index_text_job(
1355 path,
1356 new_index_text.as_ref().map(|rope| rope.to_string()),
1357 Some(hunk_staging_operation_count),
1358 cx,
1359 )
1360 });
1361 let diff = diff.downgrade();
1362 cx.spawn(async move |this, cx| {
1363 if let Ok(Err(error)) = cx.background_spawn(recv).await {
1364 diff.update(cx, |diff, cx| {
1365 diff.clear_pending_hunks(cx);
1366 })
1367 .ok();
1368 this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
1369 .ok();
1370 }
1371 })
1372 .detach();
1373 }
1374 }
1375 }
1376 }
1377
1378 fn local_worktree_git_repos_changed(
1379 &mut self,
1380 worktree: Entity<Worktree>,
1381 changed_repos: &UpdatedGitRepositoriesSet,
1382 cx: &mut Context<Self>,
1383 ) {
1384 log::debug!("local worktree repos changed");
1385 debug_assert!(worktree.read(cx).is_local());
1386
1387 for repository in self.repositories.values() {
1388 repository.update(cx, |repository, cx| {
1389 let repo_abs_path = &repository.work_directory_abs_path;
1390 if changed_repos.iter().any(|update| {
1391 update.old_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1392 || update.new_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1393 }) {
1394 repository.reload_buffer_diff_bases(cx);
1395 }
1396 });
1397 }
1398 }
1399
1400 pub fn repositories(&self) -> &HashMap<RepositoryId, Entity<Repository>> {
1401 &self.repositories
1402 }
1403
1404 pub fn status_for_buffer_id(&self, buffer_id: BufferId, cx: &App) -> Option<FileStatus> {
1405 let (repo, path) = self.repository_and_path_for_buffer_id(buffer_id, cx)?;
1406 let status = repo.read(cx).snapshot.status_for_path(&path)?;
1407 Some(status.status)
1408 }
1409
1410 pub fn repository_and_path_for_buffer_id(
1411 &self,
1412 buffer_id: BufferId,
1413 cx: &App,
1414 ) -> Option<(Entity<Repository>, RepoPath)> {
1415 let buffer = self.buffer_store.read(cx).get(buffer_id)?;
1416 let project_path = buffer.read(cx).project_path(cx)?;
1417 self.repository_and_path_for_project_path(&project_path, cx)
1418 }
1419
1420 pub fn repository_and_path_for_project_path(
1421 &self,
1422 path: &ProjectPath,
1423 cx: &App,
1424 ) -> Option<(Entity<Repository>, RepoPath)> {
1425 let abs_path = self.worktree_store.read(cx).absolutize(path, cx)?;
1426 self.repositories
1427 .values()
1428 .filter_map(|repo| {
1429 let repo_path = repo.read(cx).abs_path_to_repo_path(&abs_path)?;
1430 Some((repo.clone(), repo_path))
1431 })
1432 .max_by_key(|(repo, _)| repo.read(cx).work_directory_abs_path.clone())
1433 }
1434
1435 pub fn git_init(
1436 &self,
1437 path: Arc<Path>,
1438 fallback_branch_name: String,
1439 cx: &App,
1440 ) -> Task<Result<()>> {
1441 match &self.state {
1442 GitStoreState::Local { fs, .. } => {
1443 let fs = fs.clone();
1444 cx.background_executor()
1445 .spawn(async move { fs.git_init(&path, fallback_branch_name) })
1446 }
1447 GitStoreState::Ssh {
1448 upstream_client,
1449 upstream_project_id: project_id,
1450 ..
1451 }
1452 | GitStoreState::Remote {
1453 upstream_client,
1454 upstream_project_id: project_id,
1455 ..
1456 } => {
1457 let client = upstream_client.clone();
1458 let project_id = *project_id;
1459 cx.background_executor().spawn(async move {
1460 client
1461 .request(proto::GitInit {
1462 project_id: project_id.0,
1463 abs_path: path.to_string_lossy().to_string(),
1464 fallback_branch_name,
1465 })
1466 .await?;
1467 Ok(())
1468 })
1469 }
1470 }
1471 }
1472
1473 async fn handle_update_repository(
1474 this: Entity<Self>,
1475 envelope: TypedEnvelope<proto::UpdateRepository>,
1476 mut cx: AsyncApp,
1477 ) -> Result<()> {
1478 this.update(&mut cx, |this, cx| {
1479 let mut update = envelope.payload;
1480
1481 let id = RepositoryId::from_proto(update.id);
1482 let client = this
1483 .upstream_client()
1484 .context("no upstream client")?
1485 .clone();
1486
1487 let mut is_new = false;
1488 let repo = this.repositories.entry(id).or_insert_with(|| {
1489 is_new = true;
1490 let git_store = cx.weak_entity();
1491 cx.new(|cx| {
1492 Repository::remote(
1493 id,
1494 Path::new(&update.abs_path).into(),
1495 ProjectId(update.project_id),
1496 client,
1497 git_store,
1498 cx,
1499 )
1500 })
1501 });
1502 if is_new {
1503 this._subscriptions
1504 .push(cx.subscribe(&repo, Self::on_repository_event))
1505 }
1506
1507 repo.update(cx, {
1508 let update = update.clone();
1509 |repo, cx| repo.apply_remote_update(update, cx)
1510 })?;
1511
1512 this.active_repo_id.get_or_insert_with(|| {
1513 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1514 id
1515 });
1516
1517 if let Some((client, project_id)) = this.downstream_client() {
1518 update.project_id = project_id.to_proto();
1519 client.send(update).log_err();
1520 }
1521 Ok(())
1522 })?
1523 }
1524
1525 async fn handle_remove_repository(
1526 this: Entity<Self>,
1527 envelope: TypedEnvelope<proto::RemoveRepository>,
1528 mut cx: AsyncApp,
1529 ) -> Result<()> {
1530 this.update(&mut cx, |this, cx| {
1531 let mut update = envelope.payload;
1532 let id = RepositoryId::from_proto(update.id);
1533 this.repositories.remove(&id);
1534 if let Some((client, project_id)) = this.downstream_client() {
1535 update.project_id = project_id.to_proto();
1536 client.send(update).log_err();
1537 }
1538 if this.active_repo_id == Some(id) {
1539 this.active_repo_id = None;
1540 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1541 }
1542 cx.emit(GitStoreEvent::RepositoryRemoved(id));
1543 })
1544 }
1545
1546 async fn handle_git_init(
1547 this: Entity<Self>,
1548 envelope: TypedEnvelope<proto::GitInit>,
1549 cx: AsyncApp,
1550 ) -> Result<proto::Ack> {
1551 let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1552 let name = envelope.payload.fallback_branch_name;
1553 cx.update(|cx| this.read(cx).git_init(path, name, cx))?
1554 .await?;
1555
1556 Ok(proto::Ack {})
1557 }
1558
1559 async fn handle_fetch(
1560 this: Entity<Self>,
1561 envelope: TypedEnvelope<proto::Fetch>,
1562 mut cx: AsyncApp,
1563 ) -> Result<proto::RemoteMessageResponse> {
1564 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1565 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1566 let askpass_id = envelope.payload.askpass_id;
1567
1568 let askpass = make_remote_delegate(
1569 this,
1570 envelope.payload.project_id,
1571 repository_id,
1572 askpass_id,
1573 &mut cx,
1574 );
1575
1576 let remote_output = repository_handle
1577 .update(&mut cx, |repository_handle, cx| {
1578 repository_handle.fetch(askpass, cx)
1579 })?
1580 .await??;
1581
1582 Ok(proto::RemoteMessageResponse {
1583 stdout: remote_output.stdout,
1584 stderr: remote_output.stderr,
1585 })
1586 }
1587
1588 async fn handle_push(
1589 this: Entity<Self>,
1590 envelope: TypedEnvelope<proto::Push>,
1591 mut cx: AsyncApp,
1592 ) -> Result<proto::RemoteMessageResponse> {
1593 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1594 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1595
1596 let askpass_id = envelope.payload.askpass_id;
1597 let askpass = make_remote_delegate(
1598 this,
1599 envelope.payload.project_id,
1600 repository_id,
1601 askpass_id,
1602 &mut cx,
1603 );
1604
1605 let options = envelope
1606 .payload
1607 .options
1608 .as_ref()
1609 .map(|_| match envelope.payload.options() {
1610 proto::push::PushOptions::SetUpstream => git::repository::PushOptions::SetUpstream,
1611 proto::push::PushOptions::Force => git::repository::PushOptions::Force,
1612 });
1613
1614 let branch_name = envelope.payload.branch_name.into();
1615 let remote_name = envelope.payload.remote_name.into();
1616
1617 let remote_output = repository_handle
1618 .update(&mut cx, |repository_handle, cx| {
1619 repository_handle.push(branch_name, remote_name, options, askpass, cx)
1620 })?
1621 .await??;
1622 Ok(proto::RemoteMessageResponse {
1623 stdout: remote_output.stdout,
1624 stderr: remote_output.stderr,
1625 })
1626 }
1627
1628 async fn handle_pull(
1629 this: Entity<Self>,
1630 envelope: TypedEnvelope<proto::Pull>,
1631 mut cx: AsyncApp,
1632 ) -> Result<proto::RemoteMessageResponse> {
1633 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1634 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1635 let askpass_id = envelope.payload.askpass_id;
1636 let askpass = make_remote_delegate(
1637 this,
1638 envelope.payload.project_id,
1639 repository_id,
1640 askpass_id,
1641 &mut cx,
1642 );
1643
1644 let branch_name = envelope.payload.branch_name.into();
1645 let remote_name = envelope.payload.remote_name.into();
1646
1647 let remote_message = repository_handle
1648 .update(&mut cx, |repository_handle, cx| {
1649 repository_handle.pull(branch_name, remote_name, askpass, cx)
1650 })?
1651 .await??;
1652
1653 Ok(proto::RemoteMessageResponse {
1654 stdout: remote_message.stdout,
1655 stderr: remote_message.stderr,
1656 })
1657 }
1658
1659 async fn handle_stage(
1660 this: Entity<Self>,
1661 envelope: TypedEnvelope<proto::Stage>,
1662 mut cx: AsyncApp,
1663 ) -> Result<proto::Ack> {
1664 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1665 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1666
1667 let entries = envelope
1668 .payload
1669 .paths
1670 .into_iter()
1671 .map(PathBuf::from)
1672 .map(RepoPath::new)
1673 .collect();
1674
1675 repository_handle
1676 .update(&mut cx, |repository_handle, cx| {
1677 repository_handle.stage_entries(entries, cx)
1678 })?
1679 .await?;
1680 Ok(proto::Ack {})
1681 }
1682
1683 async fn handle_unstage(
1684 this: Entity<Self>,
1685 envelope: TypedEnvelope<proto::Unstage>,
1686 mut cx: AsyncApp,
1687 ) -> Result<proto::Ack> {
1688 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1689 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1690
1691 let entries = envelope
1692 .payload
1693 .paths
1694 .into_iter()
1695 .map(PathBuf::from)
1696 .map(RepoPath::new)
1697 .collect();
1698
1699 repository_handle
1700 .update(&mut cx, |repository_handle, cx| {
1701 repository_handle.unstage_entries(entries, cx)
1702 })?
1703 .await?;
1704
1705 Ok(proto::Ack {})
1706 }
1707
1708 async fn handle_set_index_text(
1709 this: Entity<Self>,
1710 envelope: TypedEnvelope<proto::SetIndexText>,
1711 mut cx: AsyncApp,
1712 ) -> Result<proto::Ack> {
1713 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1714 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1715 let repo_path = RepoPath::from_str(&envelope.payload.path);
1716
1717 repository_handle
1718 .update(&mut cx, |repository_handle, cx| {
1719 repository_handle.spawn_set_index_text_job(
1720 repo_path,
1721 envelope.payload.text,
1722 None,
1723 cx,
1724 )
1725 })?
1726 .await??;
1727 Ok(proto::Ack {})
1728 }
1729
1730 async fn handle_commit(
1731 this: Entity<Self>,
1732 envelope: TypedEnvelope<proto::Commit>,
1733 mut cx: AsyncApp,
1734 ) -> Result<proto::Ack> {
1735 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1736 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1737
1738 let message = SharedString::from(envelope.payload.message);
1739 let name = envelope.payload.name.map(SharedString::from);
1740 let email = envelope.payload.email.map(SharedString::from);
1741 let options = envelope.payload.options.unwrap_or_default();
1742
1743 repository_handle
1744 .update(&mut cx, |repository_handle, cx| {
1745 repository_handle.commit(
1746 message,
1747 name.zip(email),
1748 CommitOptions {
1749 amend: options.amend,
1750 },
1751 cx,
1752 )
1753 })?
1754 .await??;
1755 Ok(proto::Ack {})
1756 }
1757
1758 async fn handle_get_remotes(
1759 this: Entity<Self>,
1760 envelope: TypedEnvelope<proto::GetRemotes>,
1761 mut cx: AsyncApp,
1762 ) -> Result<proto::GetRemotesResponse> {
1763 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1764 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1765
1766 let branch_name = envelope.payload.branch_name;
1767
1768 let remotes = repository_handle
1769 .update(&mut cx, |repository_handle, _| {
1770 repository_handle.get_remotes(branch_name)
1771 })?
1772 .await??;
1773
1774 Ok(proto::GetRemotesResponse {
1775 remotes: remotes
1776 .into_iter()
1777 .map(|remotes| proto::get_remotes_response::Remote {
1778 name: remotes.name.to_string(),
1779 })
1780 .collect::<Vec<_>>(),
1781 })
1782 }
1783
1784 async fn handle_get_branches(
1785 this: Entity<Self>,
1786 envelope: TypedEnvelope<proto::GitGetBranches>,
1787 mut cx: AsyncApp,
1788 ) -> Result<proto::GitBranchesResponse> {
1789 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1790 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1791
1792 let branches = repository_handle
1793 .update(&mut cx, |repository_handle, _| repository_handle.branches())?
1794 .await??;
1795
1796 Ok(proto::GitBranchesResponse {
1797 branches: branches
1798 .into_iter()
1799 .map(|branch| branch_to_proto(&branch))
1800 .collect::<Vec<_>>(),
1801 })
1802 }
1803 async fn handle_create_branch(
1804 this: Entity<Self>,
1805 envelope: TypedEnvelope<proto::GitCreateBranch>,
1806 mut cx: AsyncApp,
1807 ) -> Result<proto::Ack> {
1808 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1809 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1810 let branch_name = envelope.payload.branch_name;
1811
1812 repository_handle
1813 .update(&mut cx, |repository_handle, _| {
1814 repository_handle.create_branch(branch_name)
1815 })?
1816 .await??;
1817
1818 Ok(proto::Ack {})
1819 }
1820
1821 async fn handle_change_branch(
1822 this: Entity<Self>,
1823 envelope: TypedEnvelope<proto::GitChangeBranch>,
1824 mut cx: AsyncApp,
1825 ) -> Result<proto::Ack> {
1826 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1827 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1828 let branch_name = envelope.payload.branch_name;
1829
1830 repository_handle
1831 .update(&mut cx, |repository_handle, _| {
1832 repository_handle.change_branch(branch_name)
1833 })?
1834 .await??;
1835
1836 Ok(proto::Ack {})
1837 }
1838
1839 async fn handle_show(
1840 this: Entity<Self>,
1841 envelope: TypedEnvelope<proto::GitShow>,
1842 mut cx: AsyncApp,
1843 ) -> Result<proto::GitCommitDetails> {
1844 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1845 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1846
1847 let commit = repository_handle
1848 .update(&mut cx, |repository_handle, _| {
1849 repository_handle.show(envelope.payload.commit)
1850 })?
1851 .await??;
1852 Ok(proto::GitCommitDetails {
1853 sha: commit.sha.into(),
1854 message: commit.message.into(),
1855 commit_timestamp: commit.commit_timestamp,
1856 author_email: commit.author_email.into(),
1857 author_name: commit.author_name.into(),
1858 })
1859 }
1860
1861 async fn handle_load_commit_diff(
1862 this: Entity<Self>,
1863 envelope: TypedEnvelope<proto::LoadCommitDiff>,
1864 mut cx: AsyncApp,
1865 ) -> Result<proto::LoadCommitDiffResponse> {
1866 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1867 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1868
1869 let commit_diff = repository_handle
1870 .update(&mut cx, |repository_handle, _| {
1871 repository_handle.load_commit_diff(envelope.payload.commit)
1872 })?
1873 .await??;
1874 Ok(proto::LoadCommitDiffResponse {
1875 files: commit_diff
1876 .files
1877 .into_iter()
1878 .map(|file| proto::CommitFile {
1879 path: file.path.to_string(),
1880 old_text: file.old_text,
1881 new_text: file.new_text,
1882 })
1883 .collect(),
1884 })
1885 }
1886
1887 async fn handle_reset(
1888 this: Entity<Self>,
1889 envelope: TypedEnvelope<proto::GitReset>,
1890 mut cx: AsyncApp,
1891 ) -> Result<proto::Ack> {
1892 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1893 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1894
1895 let mode = match envelope.payload.mode() {
1896 git_reset::ResetMode::Soft => ResetMode::Soft,
1897 git_reset::ResetMode::Mixed => ResetMode::Mixed,
1898 };
1899
1900 repository_handle
1901 .update(&mut cx, |repository_handle, cx| {
1902 repository_handle.reset(envelope.payload.commit, mode, cx)
1903 })?
1904 .await??;
1905 Ok(proto::Ack {})
1906 }
1907
1908 async fn handle_checkout_files(
1909 this: Entity<Self>,
1910 envelope: TypedEnvelope<proto::GitCheckoutFiles>,
1911 mut cx: AsyncApp,
1912 ) -> Result<proto::Ack> {
1913 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1914 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1915 let paths = envelope
1916 .payload
1917 .paths
1918 .iter()
1919 .map(|s| RepoPath::from_str(s))
1920 .collect();
1921
1922 repository_handle
1923 .update(&mut cx, |repository_handle, cx| {
1924 repository_handle.checkout_files(&envelope.payload.commit, paths, cx)
1925 })?
1926 .await??;
1927 Ok(proto::Ack {})
1928 }
1929
1930 async fn handle_open_commit_message_buffer(
1931 this: Entity<Self>,
1932 envelope: TypedEnvelope<proto::OpenCommitMessageBuffer>,
1933 mut cx: AsyncApp,
1934 ) -> Result<proto::OpenBufferResponse> {
1935 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1936 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1937 let buffer = repository
1938 .update(&mut cx, |repository, cx| {
1939 repository.open_commit_buffer(None, this.read(cx).buffer_store.clone(), cx)
1940 })?
1941 .await?;
1942
1943 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id())?;
1944 this.update(&mut cx, |this, cx| {
1945 this.buffer_store.update(cx, |buffer_store, cx| {
1946 buffer_store
1947 .create_buffer_for_peer(
1948 &buffer,
1949 envelope.original_sender_id.unwrap_or(envelope.sender_id),
1950 cx,
1951 )
1952 .detach_and_log_err(cx);
1953 })
1954 })?;
1955
1956 Ok(proto::OpenBufferResponse {
1957 buffer_id: buffer_id.to_proto(),
1958 })
1959 }
1960
1961 async fn handle_askpass(
1962 this: Entity<Self>,
1963 envelope: TypedEnvelope<proto::AskPassRequest>,
1964 mut cx: AsyncApp,
1965 ) -> Result<proto::AskPassResponse> {
1966 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1967 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1968
1969 let delegates = cx.update(|cx| repository.read(cx).askpass_delegates.clone())?;
1970 let Some(mut askpass) = delegates.lock().remove(&envelope.payload.askpass_id) else {
1971 debug_panic!("no askpass found");
1972 return Err(anyhow::anyhow!("no askpass found"));
1973 };
1974
1975 let response = askpass.ask_password(envelope.payload.prompt).await?;
1976
1977 delegates
1978 .lock()
1979 .insert(envelope.payload.askpass_id, askpass);
1980
1981 Ok(proto::AskPassResponse { response })
1982 }
1983
1984 async fn handle_check_for_pushed_commits(
1985 this: Entity<Self>,
1986 envelope: TypedEnvelope<proto::CheckForPushedCommits>,
1987 mut cx: AsyncApp,
1988 ) -> Result<proto::CheckForPushedCommitsResponse> {
1989 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1990 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1991
1992 let branches = repository_handle
1993 .update(&mut cx, |repository_handle, _| {
1994 repository_handle.check_for_pushed_commits()
1995 })?
1996 .await??;
1997 Ok(proto::CheckForPushedCommitsResponse {
1998 pushed_to: branches
1999 .into_iter()
2000 .map(|commit| commit.to_string())
2001 .collect(),
2002 })
2003 }
2004
2005 async fn handle_git_diff(
2006 this: Entity<Self>,
2007 envelope: TypedEnvelope<proto::GitDiff>,
2008 mut cx: AsyncApp,
2009 ) -> Result<proto::GitDiffResponse> {
2010 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2011 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2012 let diff_type = match envelope.payload.diff_type() {
2013 proto::git_diff::DiffType::HeadToIndex => DiffType::HeadToIndex,
2014 proto::git_diff::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
2015 };
2016
2017 let mut diff = repository_handle
2018 .update(&mut cx, |repository_handle, cx| {
2019 repository_handle.diff(diff_type, cx)
2020 })?
2021 .await??;
2022 const ONE_MB: usize = 1_000_000;
2023 if diff.len() > ONE_MB {
2024 diff = diff.chars().take(ONE_MB).collect()
2025 }
2026
2027 Ok(proto::GitDiffResponse { diff })
2028 }
2029
2030 async fn handle_open_unstaged_diff(
2031 this: Entity<Self>,
2032 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2033 mut cx: AsyncApp,
2034 ) -> Result<proto::OpenUnstagedDiffResponse> {
2035 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2036 let diff = this
2037 .update(&mut cx, |this, cx| {
2038 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2039 Some(this.open_unstaged_diff(buffer, cx))
2040 })?
2041 .ok_or_else(|| anyhow!("no such buffer"))?
2042 .await?;
2043 this.update(&mut cx, |this, _| {
2044 let shared_diffs = this
2045 .shared_diffs
2046 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2047 .or_default();
2048 shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone());
2049 })?;
2050 let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?;
2051 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2052 }
2053
2054 async fn handle_open_uncommitted_diff(
2055 this: Entity<Self>,
2056 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2057 mut cx: AsyncApp,
2058 ) -> Result<proto::OpenUncommittedDiffResponse> {
2059 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2060 let diff = this
2061 .update(&mut cx, |this, cx| {
2062 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2063 Some(this.open_uncommitted_diff(buffer, cx))
2064 })?
2065 .ok_or_else(|| anyhow!("no such buffer"))?
2066 .await?;
2067 this.update(&mut cx, |this, _| {
2068 let shared_diffs = this
2069 .shared_diffs
2070 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2071 .or_default();
2072 shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone());
2073 })?;
2074 diff.read_with(&cx, |diff, cx| {
2075 use proto::open_uncommitted_diff_response::Mode;
2076
2077 let unstaged_diff = diff.secondary_diff();
2078 let index_snapshot = unstaged_diff.and_then(|diff| {
2079 let diff = diff.read(cx);
2080 diff.base_text_exists().then(|| diff.base_text())
2081 });
2082
2083 let mode;
2084 let staged_text;
2085 let committed_text;
2086 if diff.base_text_exists() {
2087 let committed_snapshot = diff.base_text();
2088 committed_text = Some(committed_snapshot.text());
2089 if let Some(index_text) = index_snapshot {
2090 if index_text.remote_id() == committed_snapshot.remote_id() {
2091 mode = Mode::IndexMatchesHead;
2092 staged_text = None;
2093 } else {
2094 mode = Mode::IndexAndHead;
2095 staged_text = Some(index_text.text());
2096 }
2097 } else {
2098 mode = Mode::IndexAndHead;
2099 staged_text = None;
2100 }
2101 } else {
2102 mode = Mode::IndexAndHead;
2103 committed_text = None;
2104 staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2105 }
2106
2107 proto::OpenUncommittedDiffResponse {
2108 committed_text,
2109 staged_text,
2110 mode: mode.into(),
2111 }
2112 })
2113 }
2114
2115 async fn handle_update_diff_bases(
2116 this: Entity<Self>,
2117 request: TypedEnvelope<proto::UpdateDiffBases>,
2118 mut cx: AsyncApp,
2119 ) -> Result<()> {
2120 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2121 this.update(&mut cx, |this, cx| {
2122 if let Some(diff_state) = this.diffs.get_mut(&buffer_id) {
2123 if let Some(buffer) = this.buffer_store.read(cx).get(buffer_id) {
2124 let buffer = buffer.read(cx).text_snapshot();
2125 diff_state.update(cx, |diff_state, cx| {
2126 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2127 })
2128 }
2129 }
2130 })
2131 }
2132
2133 async fn handle_blame_buffer(
2134 this: Entity<Self>,
2135 envelope: TypedEnvelope<proto::BlameBuffer>,
2136 mut cx: AsyncApp,
2137 ) -> Result<proto::BlameBufferResponse> {
2138 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2139 let version = deserialize_version(&envelope.payload.version);
2140 let buffer = this.read_with(&cx, |this, cx| {
2141 this.buffer_store.read(cx).get_existing(buffer_id)
2142 })??;
2143 buffer
2144 .update(&mut cx, |buffer, _| {
2145 buffer.wait_for_version(version.clone())
2146 })?
2147 .await?;
2148 let blame = this
2149 .update(&mut cx, |this, cx| {
2150 this.blame_buffer(&buffer, Some(version), cx)
2151 })?
2152 .await?;
2153 Ok(serialize_blame_buffer_response(blame))
2154 }
2155
2156 async fn handle_get_permalink_to_line(
2157 this: Entity<Self>,
2158 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2159 mut cx: AsyncApp,
2160 ) -> Result<proto::GetPermalinkToLineResponse> {
2161 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2162 // let version = deserialize_version(&envelope.payload.version);
2163 let selection = {
2164 let proto_selection = envelope
2165 .payload
2166 .selection
2167 .context("no selection to get permalink for defined")?;
2168 proto_selection.start as u32..proto_selection.end as u32
2169 };
2170 let buffer = this.read_with(&cx, |this, cx| {
2171 this.buffer_store.read(cx).get_existing(buffer_id)
2172 })??;
2173 let permalink = this
2174 .update(&mut cx, |this, cx| {
2175 this.get_permalink_to_line(&buffer, selection, cx)
2176 })?
2177 .await?;
2178 Ok(proto::GetPermalinkToLineResponse {
2179 permalink: permalink.to_string(),
2180 })
2181 }
2182
2183 fn repository_for_request(
2184 this: &Entity<Self>,
2185 id: RepositoryId,
2186 cx: &mut AsyncApp,
2187 ) -> Result<Entity<Repository>> {
2188 this.update(cx, |this, _| {
2189 this.repositories
2190 .get(&id)
2191 .context("missing repository handle")
2192 .cloned()
2193 })?
2194 }
2195
2196 pub fn repo_snapshots(&self, cx: &App) -> HashMap<RepositoryId, RepositorySnapshot> {
2197 self.repositories
2198 .iter()
2199 .map(|(id, repo)| (*id, repo.read(cx).snapshot.clone()))
2200 .collect()
2201 }
2202}
2203
2204impl BufferGitState {
2205 fn new(_git_store: WeakEntity<GitStore>) -> Self {
2206 Self {
2207 unstaged_diff: Default::default(),
2208 uncommitted_diff: Default::default(),
2209 recalculate_diff_task: Default::default(),
2210 language: Default::default(),
2211 language_registry: Default::default(),
2212 recalculating_tx: postage::watch::channel_with(false).0,
2213 hunk_staging_operation_count: 0,
2214 hunk_staging_operation_count_as_of_write: 0,
2215 head_text: Default::default(),
2216 index_text: Default::default(),
2217 head_changed: Default::default(),
2218 index_changed: Default::default(),
2219 language_changed: Default::default(),
2220 conflict_updated_futures: Default::default(),
2221 conflict_set: Default::default(),
2222 reparse_conflict_markers_task: Default::default(),
2223 }
2224 }
2225
2226 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
2227 self.language = buffer.read(cx).language().cloned();
2228 self.language_changed = true;
2229 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
2230 }
2231
2232 fn reparse_conflict_markers(
2233 &mut self,
2234 buffer: text::BufferSnapshot,
2235 cx: &mut Context<Self>,
2236 ) -> oneshot::Receiver<()> {
2237 let (tx, rx) = oneshot::channel();
2238
2239 let Some(conflict_set) = self
2240 .conflict_set
2241 .as_ref()
2242 .and_then(|conflict_set| conflict_set.upgrade())
2243 else {
2244 return rx;
2245 };
2246
2247 let old_snapshot = conflict_set.read_with(cx, |conflict_set, _| {
2248 if conflict_set.has_conflict {
2249 Some(conflict_set.snapshot())
2250 } else {
2251 None
2252 }
2253 });
2254
2255 if let Some(old_snapshot) = old_snapshot {
2256 self.conflict_updated_futures.push(tx);
2257 self.reparse_conflict_markers_task = Some(cx.spawn(async move |this, cx| {
2258 let (snapshot, changed_range) = cx
2259 .background_spawn(async move {
2260 let new_snapshot = ConflictSet::parse(&buffer);
2261 let changed_range = old_snapshot.compare(&new_snapshot, &buffer);
2262 (new_snapshot, changed_range)
2263 })
2264 .await;
2265 this.update(cx, |this, cx| {
2266 if let Some(conflict_set) = &this.conflict_set {
2267 conflict_set
2268 .update(cx, |conflict_set, cx| {
2269 conflict_set.set_snapshot(snapshot, changed_range, cx);
2270 })
2271 .ok();
2272 }
2273 let futures = std::mem::take(&mut this.conflict_updated_futures);
2274 for tx in futures {
2275 tx.send(()).ok();
2276 }
2277 })
2278 }))
2279 }
2280
2281 rx
2282 }
2283
2284 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
2285 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
2286 }
2287
2288 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
2289 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
2290 }
2291
2292 fn handle_base_texts_updated(
2293 &mut self,
2294 buffer: text::BufferSnapshot,
2295 message: proto::UpdateDiffBases,
2296 cx: &mut Context<Self>,
2297 ) {
2298 use proto::update_diff_bases::Mode;
2299
2300 let Some(mode) = Mode::from_i32(message.mode) else {
2301 return;
2302 };
2303
2304 let diff_bases_change = match mode {
2305 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
2306 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
2307 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
2308 Mode::IndexAndHead => DiffBasesChange::SetEach {
2309 index: message.staged_text,
2310 head: message.committed_text,
2311 },
2312 };
2313
2314 self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
2315 }
2316
2317 pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
2318 if *self.recalculating_tx.borrow() {
2319 let mut rx = self.recalculating_tx.subscribe();
2320 return Some(async move {
2321 loop {
2322 let is_recalculating = rx.recv().await;
2323 if is_recalculating != Some(true) {
2324 break;
2325 }
2326 }
2327 });
2328 } else {
2329 None
2330 }
2331 }
2332
2333 fn diff_bases_changed(
2334 &mut self,
2335 buffer: text::BufferSnapshot,
2336 diff_bases_change: Option<DiffBasesChange>,
2337 cx: &mut Context<Self>,
2338 ) {
2339 match diff_bases_change {
2340 Some(DiffBasesChange::SetIndex(index)) => {
2341 self.index_text = index.map(|mut index| {
2342 text::LineEnding::normalize(&mut index);
2343 Arc::new(index)
2344 });
2345 self.index_changed = true;
2346 }
2347 Some(DiffBasesChange::SetHead(head)) => {
2348 self.head_text = head.map(|mut head| {
2349 text::LineEnding::normalize(&mut head);
2350 Arc::new(head)
2351 });
2352 self.head_changed = true;
2353 }
2354 Some(DiffBasesChange::SetBoth(text)) => {
2355 let text = text.map(|mut text| {
2356 text::LineEnding::normalize(&mut text);
2357 Arc::new(text)
2358 });
2359 self.head_text = text.clone();
2360 self.index_text = text;
2361 self.head_changed = true;
2362 self.index_changed = true;
2363 }
2364 Some(DiffBasesChange::SetEach { index, head }) => {
2365 self.index_text = index.map(|mut index| {
2366 text::LineEnding::normalize(&mut index);
2367 Arc::new(index)
2368 });
2369 self.index_changed = true;
2370 self.head_text = head.map(|mut head| {
2371 text::LineEnding::normalize(&mut head);
2372 Arc::new(head)
2373 });
2374 self.head_changed = true;
2375 }
2376 None => {}
2377 }
2378
2379 self.recalculate_diffs(buffer, cx)
2380 }
2381
2382 fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
2383 *self.recalculating_tx.borrow_mut() = true;
2384
2385 let language = self.language.clone();
2386 let language_registry = self.language_registry.clone();
2387 let unstaged_diff = self.unstaged_diff();
2388 let uncommitted_diff = self.uncommitted_diff();
2389 let head = self.head_text.clone();
2390 let index = self.index_text.clone();
2391 let index_changed = self.index_changed;
2392 let head_changed = self.head_changed;
2393 let language_changed = self.language_changed;
2394 let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_write;
2395 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
2396 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
2397 (None, None) => true,
2398 _ => false,
2399 };
2400 self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
2401 log::debug!(
2402 "start recalculating diffs for buffer {}",
2403 buffer.remote_id()
2404 );
2405
2406 let mut new_unstaged_diff = None;
2407 if let Some(unstaged_diff) = &unstaged_diff {
2408 new_unstaged_diff = Some(
2409 BufferDiff::update_diff(
2410 unstaged_diff.clone(),
2411 buffer.clone(),
2412 index,
2413 index_changed,
2414 language_changed,
2415 language.clone(),
2416 language_registry.clone(),
2417 cx,
2418 )
2419 .await?,
2420 );
2421 }
2422
2423 let mut new_uncommitted_diff = None;
2424 if let Some(uncommitted_diff) = &uncommitted_diff {
2425 new_uncommitted_diff = if index_matches_head {
2426 new_unstaged_diff.clone()
2427 } else {
2428 Some(
2429 BufferDiff::update_diff(
2430 uncommitted_diff.clone(),
2431 buffer.clone(),
2432 head,
2433 head_changed,
2434 language_changed,
2435 language.clone(),
2436 language_registry.clone(),
2437 cx,
2438 )
2439 .await?,
2440 )
2441 }
2442 }
2443
2444 let cancel = this.update(cx, |this, _| {
2445 // This checks whether all pending stage/unstage operations
2446 // have quiesced (i.e. both the corresponding write and the
2447 // read of that write have completed). If not, then we cancel
2448 // this recalculation attempt to avoid invalidating pending
2449 // state too quickly; another recalculation will come along
2450 // later and clear the pending state once the state of the index has settled.
2451 if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
2452 *this.recalculating_tx.borrow_mut() = false;
2453 true
2454 } else {
2455 false
2456 }
2457 })?;
2458 if cancel {
2459 log::debug!(
2460 concat!(
2461 "aborting recalculating diffs for buffer {}",
2462 "due to subsequent hunk operations",
2463 ),
2464 buffer.remote_id()
2465 );
2466 return Ok(());
2467 }
2468
2469 let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
2470 unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
2471 {
2472 unstaged_diff.update(cx, |diff, cx| {
2473 if language_changed {
2474 diff.language_changed(cx);
2475 }
2476 diff.set_snapshot(new_unstaged_diff, &buffer, cx)
2477 })?
2478 } else {
2479 None
2480 };
2481
2482 if let Some((uncommitted_diff, new_uncommitted_diff)) =
2483 uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
2484 {
2485 uncommitted_diff.update(cx, |diff, cx| {
2486 if language_changed {
2487 diff.language_changed(cx);
2488 }
2489 diff.set_snapshot_with_secondary(
2490 new_uncommitted_diff,
2491 &buffer,
2492 unstaged_changed_range,
2493 true,
2494 cx,
2495 );
2496 })?;
2497 }
2498
2499 log::debug!(
2500 "finished recalculating diffs for buffer {}",
2501 buffer.remote_id()
2502 );
2503
2504 if let Some(this) = this.upgrade() {
2505 this.update(cx, |this, _| {
2506 this.index_changed = false;
2507 this.head_changed = false;
2508 this.language_changed = false;
2509 *this.recalculating_tx.borrow_mut() = false;
2510 })?;
2511 }
2512
2513 Ok(())
2514 }));
2515 }
2516}
2517
2518fn make_remote_delegate(
2519 this: Entity<GitStore>,
2520 project_id: u64,
2521 repository_id: RepositoryId,
2522 askpass_id: u64,
2523 cx: &mut AsyncApp,
2524) -> AskPassDelegate {
2525 AskPassDelegate::new(cx, move |prompt, tx, cx| {
2526 this.update(cx, |this, cx| {
2527 let Some((client, _)) = this.downstream_client() else {
2528 return;
2529 };
2530 let response = client.request(proto::AskPassRequest {
2531 project_id,
2532 repository_id: repository_id.to_proto(),
2533 askpass_id,
2534 prompt,
2535 });
2536 cx.spawn(async move |_, _| {
2537 tx.send(response.await?.response).ok();
2538 anyhow::Ok(())
2539 })
2540 .detach_and_log_err(cx);
2541 })
2542 .log_err();
2543 })
2544}
2545
2546impl RepositoryId {
2547 pub fn to_proto(self) -> u64 {
2548 self.0
2549 }
2550
2551 pub fn from_proto(id: u64) -> Self {
2552 RepositoryId(id)
2553 }
2554}
2555
2556impl RepositorySnapshot {
2557 fn empty(id: RepositoryId, work_directory_abs_path: Arc<Path>) -> Self {
2558 Self {
2559 id,
2560 statuses_by_path: Default::default(),
2561 work_directory_abs_path,
2562 branch: None,
2563 head_commit: None,
2564 scan_id: 0,
2565 merge: Default::default(),
2566 }
2567 }
2568
2569 fn initial_update(&self, project_id: u64) -> proto::UpdateRepository {
2570 proto::UpdateRepository {
2571 branch_summary: self.branch.as_ref().map(branch_to_proto),
2572 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2573 updated_statuses: self
2574 .statuses_by_path
2575 .iter()
2576 .map(|entry| entry.to_proto())
2577 .collect(),
2578 removed_statuses: Default::default(),
2579 current_merge_conflicts: self
2580 .merge
2581 .conflicted_paths
2582 .iter()
2583 .map(|repo_path| repo_path.to_proto())
2584 .collect(),
2585 project_id,
2586 id: self.id.to_proto(),
2587 abs_path: self.work_directory_abs_path.to_proto(),
2588 entry_ids: vec![self.id.to_proto()],
2589 scan_id: self.scan_id,
2590 is_last_update: true,
2591 }
2592 }
2593
2594 fn build_update(&self, old: &Self, project_id: u64) -> proto::UpdateRepository {
2595 let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
2596 let mut removed_statuses: Vec<String> = Vec::new();
2597
2598 let mut new_statuses = self.statuses_by_path.iter().peekable();
2599 let mut old_statuses = old.statuses_by_path.iter().peekable();
2600
2601 let mut current_new_entry = new_statuses.next();
2602 let mut current_old_entry = old_statuses.next();
2603 loop {
2604 match (current_new_entry, current_old_entry) {
2605 (Some(new_entry), Some(old_entry)) => {
2606 match new_entry.repo_path.cmp(&old_entry.repo_path) {
2607 Ordering::Less => {
2608 updated_statuses.push(new_entry.to_proto());
2609 current_new_entry = new_statuses.next();
2610 }
2611 Ordering::Equal => {
2612 if new_entry.status != old_entry.status {
2613 updated_statuses.push(new_entry.to_proto());
2614 }
2615 current_old_entry = old_statuses.next();
2616 current_new_entry = new_statuses.next();
2617 }
2618 Ordering::Greater => {
2619 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2620 current_old_entry = old_statuses.next();
2621 }
2622 }
2623 }
2624 (None, Some(old_entry)) => {
2625 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2626 current_old_entry = old_statuses.next();
2627 }
2628 (Some(new_entry), None) => {
2629 updated_statuses.push(new_entry.to_proto());
2630 current_new_entry = new_statuses.next();
2631 }
2632 (None, None) => break,
2633 }
2634 }
2635
2636 proto::UpdateRepository {
2637 branch_summary: self.branch.as_ref().map(branch_to_proto),
2638 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2639 updated_statuses,
2640 removed_statuses,
2641 current_merge_conflicts: self
2642 .merge
2643 .conflicted_paths
2644 .iter()
2645 .map(|path| path.as_ref().to_proto())
2646 .collect(),
2647 project_id,
2648 id: self.id.to_proto(),
2649 abs_path: self.work_directory_abs_path.to_proto(),
2650 entry_ids: vec![],
2651 scan_id: self.scan_id,
2652 is_last_update: true,
2653 }
2654 }
2655
2656 pub fn status(&self) -> impl Iterator<Item = StatusEntry> + '_ {
2657 self.statuses_by_path.iter().cloned()
2658 }
2659
2660 pub fn status_summary(&self) -> GitSummary {
2661 self.statuses_by_path.summary().item_summary
2662 }
2663
2664 pub fn status_for_path(&self, path: &RepoPath) -> Option<StatusEntry> {
2665 self.statuses_by_path
2666 .get(&PathKey(path.0.clone()), &())
2667 .cloned()
2668 }
2669
2670 pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
2671 abs_path
2672 .strip_prefix(&self.work_directory_abs_path)
2673 .map(RepoPath::from)
2674 .ok()
2675 }
2676
2677 pub fn has_conflict(&self, repo_path: &RepoPath) -> bool {
2678 self.merge.conflicted_paths.contains(repo_path)
2679 }
2680
2681 /// This is the name that will be displayed in the repository selector for this repository.
2682 pub fn display_name(&self) -> SharedString {
2683 self.work_directory_abs_path
2684 .file_name()
2685 .unwrap_or_default()
2686 .to_string_lossy()
2687 .to_string()
2688 .into()
2689 }
2690}
2691
2692impl MergeDetails {
2693 async fn load(
2694 backend: &Arc<dyn GitRepository>,
2695 status: &SumTree<StatusEntry>,
2696 prev_snapshot: &RepositorySnapshot,
2697 ) -> Result<(MergeDetails, bool)> {
2698 fn sha_eq<'a>(
2699 l: impl IntoIterator<Item = &'a CommitDetails>,
2700 r: impl IntoIterator<Item = &'a CommitDetails>,
2701 ) -> bool {
2702 l.into_iter()
2703 .map(|commit| &commit.sha)
2704 .eq(r.into_iter().map(|commit| &commit.sha))
2705 }
2706
2707 let merge_heads = try_join_all(
2708 backend
2709 .merge_head_shas()
2710 .into_iter()
2711 .map(|sha| backend.show(sha)),
2712 )
2713 .await?;
2714 let cherry_pick_head = backend.show("CHERRY_PICK_HEAD".into()).await.ok();
2715 let rebase_head = backend.show("REBASE_HEAD".into()).await.ok();
2716 let revert_head = backend.show("REVERT_HEAD".into()).await.ok();
2717 let apply_head = backend.show("APPLY_HEAD".into()).await.ok();
2718 let message = backend.merge_message().await.map(SharedString::from);
2719 let merge_heads_changed = !sha_eq(
2720 merge_heads.as_slice(),
2721 prev_snapshot.merge.merge_heads.as_slice(),
2722 ) || !sha_eq(
2723 cherry_pick_head.as_ref(),
2724 prev_snapshot.merge.cherry_pick_head.as_ref(),
2725 ) || !sha_eq(
2726 apply_head.as_ref(),
2727 prev_snapshot.merge.apply_head.as_ref(),
2728 ) || !sha_eq(
2729 rebase_head.as_ref(),
2730 prev_snapshot.merge.rebase_head.as_ref(),
2731 ) || !sha_eq(
2732 revert_head.as_ref(),
2733 prev_snapshot.merge.revert_head.as_ref(),
2734 );
2735 let conflicted_paths = if merge_heads_changed {
2736 TreeSet::from_ordered_entries(
2737 status
2738 .iter()
2739 .filter(|entry| entry.status.is_conflicted())
2740 .map(|entry| entry.repo_path.clone()),
2741 )
2742 } else {
2743 prev_snapshot.merge.conflicted_paths.clone()
2744 };
2745 let details = MergeDetails {
2746 conflicted_paths,
2747 message,
2748 apply_head,
2749 cherry_pick_head,
2750 merge_heads,
2751 rebase_head,
2752 revert_head,
2753 };
2754 Ok((details, merge_heads_changed))
2755 }
2756}
2757
2758impl Repository {
2759 pub fn snapshot(&self) -> RepositorySnapshot {
2760 self.snapshot.clone()
2761 }
2762
2763 fn local(
2764 id: RepositoryId,
2765 work_directory_abs_path: Arc<Path>,
2766 dot_git_abs_path: Arc<Path>,
2767 repository_dir_abs_path: Arc<Path>,
2768 common_dir_abs_path: Arc<Path>,
2769 project_environment: WeakEntity<ProjectEnvironment>,
2770 fs: Arc<dyn Fs>,
2771 git_store: WeakEntity<GitStore>,
2772 cx: &mut Context<Self>,
2773 ) -> Self {
2774 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path.clone());
2775 Repository {
2776 this: cx.weak_entity(),
2777 git_store,
2778 snapshot,
2779 commit_message_buffer: None,
2780 askpass_delegates: Default::default(),
2781 paths_needing_status_update: Default::default(),
2782 latest_askpass_id: 0,
2783 job_sender: Repository::spawn_local_git_worker(
2784 work_directory_abs_path,
2785 dot_git_abs_path,
2786 repository_dir_abs_path,
2787 common_dir_abs_path,
2788 project_environment,
2789 fs,
2790 cx,
2791 ),
2792 job_id: 0,
2793 active_jobs: Default::default(),
2794 }
2795 }
2796
2797 fn remote(
2798 id: RepositoryId,
2799 work_directory_abs_path: Arc<Path>,
2800 project_id: ProjectId,
2801 client: AnyProtoClient,
2802 git_store: WeakEntity<GitStore>,
2803 cx: &mut Context<Self>,
2804 ) -> Self {
2805 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path);
2806 Self {
2807 this: cx.weak_entity(),
2808 snapshot,
2809 commit_message_buffer: None,
2810 git_store,
2811 paths_needing_status_update: Default::default(),
2812 job_sender: Self::spawn_remote_git_worker(project_id, client, cx),
2813 askpass_delegates: Default::default(),
2814 latest_askpass_id: 0,
2815 active_jobs: Default::default(),
2816 job_id: 0,
2817 }
2818 }
2819
2820 pub fn git_store(&self) -> Option<Entity<GitStore>> {
2821 self.git_store.upgrade()
2822 }
2823
2824 fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
2825 let this = cx.weak_entity();
2826 let git_store = self.git_store.clone();
2827 let _ = self.send_keyed_job(
2828 Some(GitJobKey::ReloadBufferDiffBases),
2829 None,
2830 |state, mut cx| async move {
2831 let RepositoryState::Local { backend, .. } = state else {
2832 log::error!("tried to recompute diffs for a non-local repository");
2833 return Ok(());
2834 };
2835
2836 let Some(this) = this.upgrade() else {
2837 return Ok(());
2838 };
2839
2840 let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
2841 git_store.update(cx, |git_store, cx| {
2842 git_store
2843 .diffs
2844 .iter()
2845 .filter_map(|(buffer_id, diff_state)| {
2846 let buffer_store = git_store.buffer_store.read(cx);
2847 let buffer = buffer_store.get(*buffer_id)?;
2848 let file = File::from_dyn(buffer.read(cx).file())?;
2849 let abs_path =
2850 file.worktree.read(cx).absolutize(&file.path).ok()?;
2851 let repo_path = this.abs_path_to_repo_path(&abs_path)?;
2852 log::debug!(
2853 "start reload diff bases for repo path {}",
2854 repo_path.0.display()
2855 );
2856 diff_state.update(cx, |diff_state, _| {
2857 let has_unstaged_diff = diff_state
2858 .unstaged_diff
2859 .as_ref()
2860 .is_some_and(|diff| diff.is_upgradable());
2861 let has_uncommitted_diff = diff_state
2862 .uncommitted_diff
2863 .as_ref()
2864 .is_some_and(|set| set.is_upgradable());
2865
2866 Some((
2867 buffer,
2868 repo_path,
2869 has_unstaged_diff.then(|| diff_state.index_text.clone()),
2870 has_uncommitted_diff.then(|| diff_state.head_text.clone()),
2871 ))
2872 })
2873 })
2874 .collect::<Vec<_>>()
2875 })
2876 })??;
2877
2878 let buffer_diff_base_changes = cx
2879 .background_spawn(async move {
2880 let mut changes = Vec::new();
2881 for (buffer, repo_path, current_index_text, current_head_text) in
2882 &repo_diff_state_updates
2883 {
2884 let index_text = if current_index_text.is_some() {
2885 backend.load_index_text(repo_path.clone()).await
2886 } else {
2887 None
2888 };
2889 let head_text = if current_head_text.is_some() {
2890 backend.load_committed_text(repo_path.clone()).await
2891 } else {
2892 None
2893 };
2894
2895 let change =
2896 match (current_index_text.as_ref(), current_head_text.as_ref()) {
2897 (Some(current_index), Some(current_head)) => {
2898 let index_changed =
2899 index_text.as_ref() != current_index.as_deref();
2900 let head_changed =
2901 head_text.as_ref() != current_head.as_deref();
2902 if index_changed && head_changed {
2903 if index_text == head_text {
2904 Some(DiffBasesChange::SetBoth(head_text))
2905 } else {
2906 Some(DiffBasesChange::SetEach {
2907 index: index_text,
2908 head: head_text,
2909 })
2910 }
2911 } else if index_changed {
2912 Some(DiffBasesChange::SetIndex(index_text))
2913 } else if head_changed {
2914 Some(DiffBasesChange::SetHead(head_text))
2915 } else {
2916 None
2917 }
2918 }
2919 (Some(current_index), None) => {
2920 let index_changed =
2921 index_text.as_ref() != current_index.as_deref();
2922 index_changed
2923 .then_some(DiffBasesChange::SetIndex(index_text))
2924 }
2925 (None, Some(current_head)) => {
2926 let head_changed =
2927 head_text.as_ref() != current_head.as_deref();
2928 head_changed.then_some(DiffBasesChange::SetHead(head_text))
2929 }
2930 (None, None) => None,
2931 };
2932
2933 changes.push((buffer.clone(), change))
2934 }
2935 changes
2936 })
2937 .await;
2938
2939 git_store.update(&mut cx, |git_store, cx| {
2940 for (buffer, diff_bases_change) in buffer_diff_base_changes {
2941 let buffer_snapshot = buffer.read(cx).text_snapshot();
2942 let buffer_id = buffer_snapshot.remote_id();
2943 let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
2944 continue;
2945 };
2946
2947 let downstream_client = git_store.downstream_client();
2948 diff_state.update(cx, |diff_state, cx| {
2949 use proto::update_diff_bases::Mode;
2950
2951 if let Some((diff_bases_change, (client, project_id))) =
2952 diff_bases_change.clone().zip(downstream_client)
2953 {
2954 let (staged_text, committed_text, mode) = match diff_bases_change {
2955 DiffBasesChange::SetIndex(index) => {
2956 (index, None, Mode::IndexOnly)
2957 }
2958 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
2959 DiffBasesChange::SetEach { index, head } => {
2960 (index, head, Mode::IndexAndHead)
2961 }
2962 DiffBasesChange::SetBoth(text) => {
2963 (None, text, Mode::IndexMatchesHead)
2964 }
2965 };
2966 client
2967 .send(proto::UpdateDiffBases {
2968 project_id: project_id.to_proto(),
2969 buffer_id: buffer_id.to_proto(),
2970 staged_text,
2971 committed_text,
2972 mode: mode as i32,
2973 })
2974 .log_err();
2975 }
2976
2977 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
2978 });
2979 }
2980 })
2981 },
2982 );
2983 }
2984
2985 pub fn send_job<F, Fut, R>(
2986 &mut self,
2987 status: Option<SharedString>,
2988 job: F,
2989 ) -> oneshot::Receiver<R>
2990 where
2991 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
2992 Fut: Future<Output = R> + 'static,
2993 R: Send + 'static,
2994 {
2995 self.send_keyed_job(None, status, job)
2996 }
2997
2998 fn send_keyed_job<F, Fut, R>(
2999 &mut self,
3000 key: Option<GitJobKey>,
3001 status: Option<SharedString>,
3002 job: F,
3003 ) -> oneshot::Receiver<R>
3004 where
3005 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3006 Fut: Future<Output = R> + 'static,
3007 R: Send + 'static,
3008 {
3009 let (result_tx, result_rx) = futures::channel::oneshot::channel();
3010 let job_id = post_inc(&mut self.job_id);
3011 let this = self.this.clone();
3012 self.job_sender
3013 .unbounded_send(GitJob {
3014 key,
3015 job: Box::new(move |state, cx: &mut AsyncApp| {
3016 let job = job(state, cx.clone());
3017 cx.spawn(async move |cx| {
3018 if let Some(s) = status.clone() {
3019 this.update(cx, |this, cx| {
3020 this.active_jobs.insert(
3021 job_id,
3022 JobInfo {
3023 start: Instant::now(),
3024 message: s.clone(),
3025 },
3026 );
3027
3028 cx.notify();
3029 })
3030 .ok();
3031 }
3032 let result = job.await;
3033
3034 this.update(cx, |this, cx| {
3035 this.active_jobs.remove(&job_id);
3036 cx.notify();
3037 })
3038 .ok();
3039
3040 result_tx.send(result).ok();
3041 })
3042 }),
3043 })
3044 .ok();
3045 result_rx
3046 }
3047
3048 pub fn set_as_active_repository(&self, cx: &mut Context<Self>) {
3049 let Some(git_store) = self.git_store.upgrade() else {
3050 return;
3051 };
3052 let entity = cx.entity();
3053 git_store.update(cx, |git_store, cx| {
3054 let Some((&id, _)) = git_store
3055 .repositories
3056 .iter()
3057 .find(|(_, handle)| *handle == &entity)
3058 else {
3059 return;
3060 };
3061 git_store.active_repo_id = Some(id);
3062 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
3063 });
3064 }
3065
3066 pub fn cached_status(&self) -> impl '_ + Iterator<Item = StatusEntry> {
3067 self.snapshot.status()
3068 }
3069
3070 pub fn repo_path_to_project_path(&self, path: &RepoPath, cx: &App) -> Option<ProjectPath> {
3071 let git_store = self.git_store.upgrade()?;
3072 let worktree_store = git_store.read(cx).worktree_store.read(cx);
3073 let abs_path = self.snapshot.work_directory_abs_path.join(&path.0);
3074 let (worktree, relative_path) = worktree_store.find_worktree(abs_path, cx)?;
3075 Some(ProjectPath {
3076 worktree_id: worktree.read(cx).id(),
3077 path: relative_path.into(),
3078 })
3079 }
3080
3081 pub fn project_path_to_repo_path(&self, path: &ProjectPath, cx: &App) -> Option<RepoPath> {
3082 let git_store = self.git_store.upgrade()?;
3083 let worktree_store = git_store.read(cx).worktree_store.read(cx);
3084 let abs_path = worktree_store.absolutize(path, cx)?;
3085 self.snapshot.abs_path_to_repo_path(&abs_path)
3086 }
3087
3088 pub fn contains_sub_repo(&self, other: &Entity<Self>, cx: &App) -> bool {
3089 other
3090 .read(cx)
3091 .snapshot
3092 .work_directory_abs_path
3093 .starts_with(&self.snapshot.work_directory_abs_path)
3094 }
3095
3096 pub fn open_commit_buffer(
3097 &mut self,
3098 languages: Option<Arc<LanguageRegistry>>,
3099 buffer_store: Entity<BufferStore>,
3100 cx: &mut Context<Self>,
3101 ) -> Task<Result<Entity<Buffer>>> {
3102 let id = self.id;
3103 if let Some(buffer) = self.commit_message_buffer.clone() {
3104 return Task::ready(Ok(buffer));
3105 }
3106 let this = cx.weak_entity();
3107
3108 let rx = self.send_job(None, move |state, mut cx| async move {
3109 let Some(this) = this.upgrade() else {
3110 bail!("git store was dropped");
3111 };
3112 match state {
3113 RepositoryState::Local { .. } => {
3114 this.update(&mut cx, |_, cx| {
3115 Self::open_local_commit_buffer(languages, buffer_store, cx)
3116 })?
3117 .await
3118 }
3119 RepositoryState::Remote { project_id, client } => {
3120 let request = client.request(proto::OpenCommitMessageBuffer {
3121 project_id: project_id.0,
3122 repository_id: id.to_proto(),
3123 });
3124 let response = request.await.context("requesting to open commit buffer")?;
3125 let buffer_id = BufferId::new(response.buffer_id)?;
3126 let buffer = buffer_store
3127 .update(&mut cx, |buffer_store, cx| {
3128 buffer_store.wait_for_remote_buffer(buffer_id, cx)
3129 })?
3130 .await?;
3131 if let Some(language_registry) = languages {
3132 let git_commit_language =
3133 language_registry.language_for_name("Git Commit").await?;
3134 buffer.update(&mut cx, |buffer, cx| {
3135 buffer.set_language(Some(git_commit_language), cx);
3136 })?;
3137 }
3138 this.update(&mut cx, |this, _| {
3139 this.commit_message_buffer = Some(buffer.clone());
3140 })?;
3141 Ok(buffer)
3142 }
3143 }
3144 });
3145
3146 cx.spawn(|_, _: &mut AsyncApp| async move { rx.await? })
3147 }
3148
3149 fn open_local_commit_buffer(
3150 language_registry: Option<Arc<LanguageRegistry>>,
3151 buffer_store: Entity<BufferStore>,
3152 cx: &mut Context<Self>,
3153 ) -> Task<Result<Entity<Buffer>>> {
3154 cx.spawn(async move |repository, cx| {
3155 let buffer = buffer_store
3156 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx))?
3157 .await?;
3158
3159 if let Some(language_registry) = language_registry {
3160 let git_commit_language = language_registry.language_for_name("Git Commit").await?;
3161 buffer.update(cx, |buffer, cx| {
3162 buffer.set_language(Some(git_commit_language), cx);
3163 })?;
3164 }
3165
3166 repository.update(cx, |repository, _| {
3167 repository.commit_message_buffer = Some(buffer.clone());
3168 })?;
3169 Ok(buffer)
3170 })
3171 }
3172
3173 pub fn checkout_files(
3174 &mut self,
3175 commit: &str,
3176 paths: Vec<RepoPath>,
3177 _cx: &mut App,
3178 ) -> oneshot::Receiver<Result<()>> {
3179 let commit = commit.to_string();
3180 let id = self.id;
3181
3182 self.send_job(
3183 Some(format!("git checkout {}", commit).into()),
3184 move |git_repo, _| async move {
3185 match git_repo {
3186 RepositoryState::Local {
3187 backend,
3188 environment,
3189 ..
3190 } => {
3191 backend
3192 .checkout_files(commit, paths, environment.clone())
3193 .await
3194 }
3195 RepositoryState::Remote { project_id, client } => {
3196 client
3197 .request(proto::GitCheckoutFiles {
3198 project_id: project_id.0,
3199 repository_id: id.to_proto(),
3200 commit,
3201 paths: paths
3202 .into_iter()
3203 .map(|p| p.to_string_lossy().to_string())
3204 .collect(),
3205 })
3206 .await?;
3207
3208 Ok(())
3209 }
3210 }
3211 },
3212 )
3213 }
3214
3215 pub fn reset(
3216 &mut self,
3217 commit: String,
3218 reset_mode: ResetMode,
3219 _cx: &mut App,
3220 ) -> oneshot::Receiver<Result<()>> {
3221 let commit = commit.to_string();
3222 let id = self.id;
3223
3224 self.send_job(None, move |git_repo, _| async move {
3225 match git_repo {
3226 RepositoryState::Local {
3227 backend,
3228 environment,
3229 ..
3230 } => backend.reset(commit, reset_mode, environment).await,
3231 RepositoryState::Remote { project_id, client } => {
3232 client
3233 .request(proto::GitReset {
3234 project_id: project_id.0,
3235 repository_id: id.to_proto(),
3236 commit,
3237 mode: match reset_mode {
3238 ResetMode::Soft => git_reset::ResetMode::Soft.into(),
3239 ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
3240 },
3241 })
3242 .await?;
3243
3244 Ok(())
3245 }
3246 }
3247 })
3248 }
3249
3250 pub fn show(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDetails>> {
3251 let id = self.id;
3252 self.send_job(None, move |git_repo, _cx| async move {
3253 match git_repo {
3254 RepositoryState::Local { backend, .. } => backend.show(commit).await,
3255 RepositoryState::Remote { project_id, client } => {
3256 let resp = client
3257 .request(proto::GitShow {
3258 project_id: project_id.0,
3259 repository_id: id.to_proto(),
3260 commit,
3261 })
3262 .await?;
3263
3264 Ok(CommitDetails {
3265 sha: resp.sha.into(),
3266 message: resp.message.into(),
3267 commit_timestamp: resp.commit_timestamp,
3268 author_email: resp.author_email.into(),
3269 author_name: resp.author_name.into(),
3270 })
3271 }
3272 }
3273 })
3274 }
3275
3276 pub fn load_commit_diff(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
3277 let id = self.id;
3278 self.send_job(None, move |git_repo, cx| async move {
3279 match git_repo {
3280 RepositoryState::Local { backend, .. } => backend.load_commit(commit, cx).await,
3281 RepositoryState::Remote {
3282 client, project_id, ..
3283 } => {
3284 let response = client
3285 .request(proto::LoadCommitDiff {
3286 project_id: project_id.0,
3287 repository_id: id.to_proto(),
3288 commit,
3289 })
3290 .await?;
3291 Ok(CommitDiff {
3292 files: response
3293 .files
3294 .into_iter()
3295 .map(|file| CommitFile {
3296 path: Path::new(&file.path).into(),
3297 old_text: file.old_text,
3298 new_text: file.new_text,
3299 })
3300 .collect(),
3301 })
3302 }
3303 }
3304 })
3305 }
3306
3307 fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
3308 Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
3309 }
3310
3311 pub fn stage_entries(
3312 &self,
3313 entries: Vec<RepoPath>,
3314 cx: &mut Context<Self>,
3315 ) -> Task<anyhow::Result<()>> {
3316 if entries.is_empty() {
3317 return Task::ready(Ok(()));
3318 }
3319 let id = self.id;
3320
3321 let mut save_futures = Vec::new();
3322 if let Some(buffer_store) = self.buffer_store(cx) {
3323 buffer_store.update(cx, |buffer_store, cx| {
3324 for path in &entries {
3325 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3326 continue;
3327 };
3328 if let Some(buffer) = buffer_store.get_by_path(&project_path, cx) {
3329 if buffer
3330 .read(cx)
3331 .file()
3332 .map_or(false, |file| file.disk_state().exists())
3333 {
3334 save_futures.push(buffer_store.save_buffer(buffer, cx));
3335 }
3336 }
3337 }
3338 })
3339 }
3340
3341 cx.spawn(async move |this, cx| {
3342 for save_future in save_futures {
3343 save_future.await?;
3344 }
3345
3346 this.update(cx, |this, _| {
3347 this.send_job(None, move |git_repo, _cx| async move {
3348 match git_repo {
3349 RepositoryState::Local {
3350 backend,
3351 environment,
3352 ..
3353 } => backend.stage_paths(entries, environment.clone()).await,
3354 RepositoryState::Remote { project_id, client } => {
3355 client
3356 .request(proto::Stage {
3357 project_id: project_id.0,
3358 repository_id: id.to_proto(),
3359 paths: entries
3360 .into_iter()
3361 .map(|repo_path| repo_path.as_ref().to_proto())
3362 .collect(),
3363 })
3364 .await
3365 .context("sending stage request")?;
3366
3367 Ok(())
3368 }
3369 }
3370 })
3371 })?
3372 .await??;
3373
3374 Ok(())
3375 })
3376 }
3377
3378 pub fn unstage_entries(
3379 &self,
3380 entries: Vec<RepoPath>,
3381 cx: &mut Context<Self>,
3382 ) -> Task<anyhow::Result<()>> {
3383 if entries.is_empty() {
3384 return Task::ready(Ok(()));
3385 }
3386 let id = self.id;
3387
3388 let mut save_futures = Vec::new();
3389 if let Some(buffer_store) = self.buffer_store(cx) {
3390 buffer_store.update(cx, |buffer_store, cx| {
3391 for path in &entries {
3392 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3393 continue;
3394 };
3395 if let Some(buffer) = buffer_store.get_by_path(&project_path, cx) {
3396 if buffer
3397 .read(cx)
3398 .file()
3399 .map_or(false, |file| file.disk_state().exists())
3400 {
3401 save_futures.push(buffer_store.save_buffer(buffer, cx));
3402 }
3403 }
3404 }
3405 })
3406 }
3407
3408 cx.spawn(async move |this, cx| {
3409 for save_future in save_futures {
3410 save_future.await?;
3411 }
3412
3413 this.update(cx, |this, _| {
3414 this.send_job(None, move |git_repo, _cx| async move {
3415 match git_repo {
3416 RepositoryState::Local {
3417 backend,
3418 environment,
3419 ..
3420 } => backend.unstage_paths(entries, environment).await,
3421 RepositoryState::Remote { project_id, client } => {
3422 client
3423 .request(proto::Unstage {
3424 project_id: project_id.0,
3425 repository_id: id.to_proto(),
3426 paths: entries
3427 .into_iter()
3428 .map(|repo_path| repo_path.as_ref().to_proto())
3429 .collect(),
3430 })
3431 .await
3432 .context("sending unstage request")?;
3433
3434 Ok(())
3435 }
3436 }
3437 })
3438 })?
3439 .await??;
3440
3441 Ok(())
3442 })
3443 }
3444
3445 pub fn stage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3446 let to_stage = self
3447 .cached_status()
3448 .filter(|entry| !entry.status.staging().is_fully_staged())
3449 .map(|entry| entry.repo_path.clone())
3450 .collect();
3451 self.stage_entries(to_stage, cx)
3452 }
3453
3454 pub fn unstage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3455 let to_unstage = self
3456 .cached_status()
3457 .filter(|entry| entry.status.staging().has_staged())
3458 .map(|entry| entry.repo_path.clone())
3459 .collect();
3460 self.unstage_entries(to_unstage, cx)
3461 }
3462
3463 pub fn commit(
3464 &mut self,
3465 message: SharedString,
3466 name_and_email: Option<(SharedString, SharedString)>,
3467 options: CommitOptions,
3468 _cx: &mut App,
3469 ) -> oneshot::Receiver<Result<()>> {
3470 let id = self.id;
3471
3472 self.send_job(Some("git commit".into()), move |git_repo, _cx| async move {
3473 match git_repo {
3474 RepositoryState::Local {
3475 backend,
3476 environment,
3477 ..
3478 } => {
3479 backend
3480 .commit(message, name_and_email, options, environment)
3481 .await
3482 }
3483 RepositoryState::Remote { project_id, client } => {
3484 let (name, email) = name_and_email.unzip();
3485 client
3486 .request(proto::Commit {
3487 project_id: project_id.0,
3488 repository_id: id.to_proto(),
3489 message: String::from(message),
3490 name: name.map(String::from),
3491 email: email.map(String::from),
3492 options: Some(proto::commit::CommitOptions {
3493 amend: options.amend,
3494 }),
3495 })
3496 .await
3497 .context("sending commit request")?;
3498
3499 Ok(())
3500 }
3501 }
3502 })
3503 }
3504
3505 pub fn fetch(
3506 &mut self,
3507 askpass: AskPassDelegate,
3508 _cx: &mut App,
3509 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3510 let askpass_delegates = self.askpass_delegates.clone();
3511 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3512 let id = self.id;
3513
3514 self.send_job(Some("git fetch".into()), move |git_repo, cx| async move {
3515 match git_repo {
3516 RepositoryState::Local {
3517 backend,
3518 environment,
3519 ..
3520 } => backend.fetch(askpass, environment, cx).await,
3521 RepositoryState::Remote { project_id, client } => {
3522 askpass_delegates.lock().insert(askpass_id, askpass);
3523 let _defer = util::defer(|| {
3524 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3525 debug_assert!(askpass_delegate.is_some());
3526 });
3527
3528 let response = client
3529 .request(proto::Fetch {
3530 project_id: project_id.0,
3531 repository_id: id.to_proto(),
3532 askpass_id,
3533 })
3534 .await
3535 .context("sending fetch request")?;
3536
3537 Ok(RemoteCommandOutput {
3538 stdout: response.stdout,
3539 stderr: response.stderr,
3540 })
3541 }
3542 }
3543 })
3544 }
3545
3546 pub fn push(
3547 &mut self,
3548 branch: SharedString,
3549 remote: SharedString,
3550 options: Option<PushOptions>,
3551 askpass: AskPassDelegate,
3552 cx: &mut Context<Self>,
3553 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3554 let askpass_delegates = self.askpass_delegates.clone();
3555 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3556 let id = self.id;
3557
3558 let args = options
3559 .map(|option| match option {
3560 PushOptions::SetUpstream => " --set-upstream",
3561 PushOptions::Force => " --force",
3562 })
3563 .unwrap_or("");
3564
3565 let updates_tx = self
3566 .git_store()
3567 .and_then(|git_store| match &git_store.read(cx).state {
3568 GitStoreState::Local { downstream, .. } => downstream
3569 .as_ref()
3570 .map(|downstream| downstream.updates_tx.clone()),
3571 _ => None,
3572 });
3573
3574 let this = cx.weak_entity();
3575 self.send_job(
3576 Some(format!("git push {} {} {}", args, branch, remote).into()),
3577 move |git_repo, mut cx| async move {
3578 match git_repo {
3579 RepositoryState::Local {
3580 backend,
3581 environment,
3582 ..
3583 } => {
3584 let result = backend
3585 .push(
3586 branch.to_string(),
3587 remote.to_string(),
3588 options,
3589 askpass,
3590 environment.clone(),
3591 cx.clone(),
3592 )
3593 .await;
3594 if result.is_ok() {
3595 let branches = backend.branches().await?;
3596 let branch = branches.into_iter().find(|branch| branch.is_head);
3597 log::info!("head branch after scan is {branch:?}");
3598 let snapshot = this.update(&mut cx, |this, cx| {
3599 this.snapshot.branch = branch;
3600 let snapshot = this.snapshot.clone();
3601 cx.emit(RepositoryEvent::Updated { full_scan: false });
3602 snapshot
3603 })?;
3604 if let Some(updates_tx) = updates_tx {
3605 updates_tx
3606 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
3607 .ok();
3608 }
3609 }
3610 result
3611 }
3612 RepositoryState::Remote { project_id, client } => {
3613 askpass_delegates.lock().insert(askpass_id, askpass);
3614 let _defer = util::defer(|| {
3615 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3616 debug_assert!(askpass_delegate.is_some());
3617 });
3618 let response = client
3619 .request(proto::Push {
3620 project_id: project_id.0,
3621 repository_id: id.to_proto(),
3622 askpass_id,
3623 branch_name: branch.to_string(),
3624 remote_name: remote.to_string(),
3625 options: options.map(|options| match options {
3626 PushOptions::Force => proto::push::PushOptions::Force,
3627 PushOptions::SetUpstream => {
3628 proto::push::PushOptions::SetUpstream
3629 }
3630 }
3631 as i32),
3632 })
3633 .await
3634 .context("sending push request")?;
3635
3636 Ok(RemoteCommandOutput {
3637 stdout: response.stdout,
3638 stderr: response.stderr,
3639 })
3640 }
3641 }
3642 },
3643 )
3644 }
3645
3646 pub fn pull(
3647 &mut self,
3648 branch: SharedString,
3649 remote: SharedString,
3650 askpass: AskPassDelegate,
3651 _cx: &mut App,
3652 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3653 let askpass_delegates = self.askpass_delegates.clone();
3654 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3655 let id = self.id;
3656
3657 self.send_job(
3658 Some(format!("git pull {} {}", remote, branch).into()),
3659 move |git_repo, cx| async move {
3660 match git_repo {
3661 RepositoryState::Local {
3662 backend,
3663 environment,
3664 ..
3665 } => {
3666 backend
3667 .pull(
3668 branch.to_string(),
3669 remote.to_string(),
3670 askpass,
3671 environment.clone(),
3672 cx,
3673 )
3674 .await
3675 }
3676 RepositoryState::Remote { project_id, client } => {
3677 askpass_delegates.lock().insert(askpass_id, askpass);
3678 let _defer = util::defer(|| {
3679 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3680 debug_assert!(askpass_delegate.is_some());
3681 });
3682 let response = client
3683 .request(proto::Pull {
3684 project_id: project_id.0,
3685 repository_id: id.to_proto(),
3686 askpass_id,
3687 branch_name: branch.to_string(),
3688 remote_name: remote.to_string(),
3689 })
3690 .await
3691 .context("sending pull request")?;
3692
3693 Ok(RemoteCommandOutput {
3694 stdout: response.stdout,
3695 stderr: response.stderr,
3696 })
3697 }
3698 }
3699 },
3700 )
3701 }
3702
3703 fn spawn_set_index_text_job(
3704 &mut self,
3705 path: RepoPath,
3706 content: Option<String>,
3707 hunk_staging_operation_count: Option<usize>,
3708 cx: &mut Context<Self>,
3709 ) -> oneshot::Receiver<anyhow::Result<()>> {
3710 let id = self.id;
3711 let this = cx.weak_entity();
3712 let git_store = self.git_store.clone();
3713 self.send_keyed_job(
3714 Some(GitJobKey::WriteIndex(path.clone())),
3715 None,
3716 move |git_repo, mut cx| async move {
3717 log::debug!("start updating index text for buffer {}", path.display());
3718 match git_repo {
3719 RepositoryState::Local {
3720 backend,
3721 environment,
3722 ..
3723 } => {
3724 backend
3725 .set_index_text(path.clone(), content, environment.clone())
3726 .await?;
3727 }
3728 RepositoryState::Remote { project_id, client } => {
3729 client
3730 .request(proto::SetIndexText {
3731 project_id: project_id.0,
3732 repository_id: id.to_proto(),
3733 path: path.as_ref().to_proto(),
3734 text: content,
3735 })
3736 .await?;
3737 }
3738 }
3739 log::debug!("finish updating index text for buffer {}", path.display());
3740
3741 if let Some(hunk_staging_operation_count) = hunk_staging_operation_count {
3742 let project_path = this
3743 .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
3744 .ok()
3745 .flatten();
3746 git_store.update(&mut cx, |git_store, cx| {
3747 let buffer_id = git_store
3748 .buffer_store
3749 .read(cx)
3750 .get_by_path(&project_path?, cx)?
3751 .read(cx)
3752 .remote_id();
3753 let diff_state = git_store.diffs.get(&buffer_id)?;
3754 diff_state.update(cx, |diff_state, _| {
3755 diff_state.hunk_staging_operation_count_as_of_write =
3756 hunk_staging_operation_count;
3757 });
3758 Some(())
3759 })?;
3760 }
3761 Ok(())
3762 },
3763 )
3764 }
3765
3766 pub fn get_remotes(
3767 &mut self,
3768 branch_name: Option<String>,
3769 ) -> oneshot::Receiver<Result<Vec<Remote>>> {
3770 let id = self.id;
3771 self.send_job(None, move |repo, _cx| async move {
3772 match repo {
3773 RepositoryState::Local { backend, .. } => backend.get_remotes(branch_name).await,
3774 RepositoryState::Remote { project_id, client } => {
3775 let response = client
3776 .request(proto::GetRemotes {
3777 project_id: project_id.0,
3778 repository_id: id.to_proto(),
3779 branch_name,
3780 })
3781 .await?;
3782
3783 let remotes = response
3784 .remotes
3785 .into_iter()
3786 .map(|remotes| git::repository::Remote {
3787 name: remotes.name.into(),
3788 })
3789 .collect();
3790
3791 Ok(remotes)
3792 }
3793 }
3794 })
3795 }
3796
3797 pub fn branches(&mut self) -> oneshot::Receiver<Result<Vec<Branch>>> {
3798 let id = self.id;
3799 self.send_job(None, move |repo, cx| async move {
3800 match repo {
3801 RepositoryState::Local { backend, .. } => {
3802 let backend = backend.clone();
3803 cx.background_spawn(async move { backend.branches().await })
3804 .await
3805 }
3806 RepositoryState::Remote { project_id, client } => {
3807 let response = client
3808 .request(proto::GitGetBranches {
3809 project_id: project_id.0,
3810 repository_id: id.to_proto(),
3811 })
3812 .await?;
3813
3814 let branches = response
3815 .branches
3816 .into_iter()
3817 .map(|branch| proto_to_branch(&branch))
3818 .collect();
3819
3820 Ok(branches)
3821 }
3822 }
3823 })
3824 }
3825
3826 pub fn diff(&mut self, diff_type: DiffType, _cx: &App) -> oneshot::Receiver<Result<String>> {
3827 let id = self.id;
3828 self.send_job(None, move |repo, _cx| async move {
3829 match repo {
3830 RepositoryState::Local { backend, .. } => backend.diff(diff_type).await,
3831 RepositoryState::Remote { project_id, client } => {
3832 let response = client
3833 .request(proto::GitDiff {
3834 project_id: project_id.0,
3835 repository_id: id.to_proto(),
3836 diff_type: match diff_type {
3837 DiffType::HeadToIndex => {
3838 proto::git_diff::DiffType::HeadToIndex.into()
3839 }
3840 DiffType::HeadToWorktree => {
3841 proto::git_diff::DiffType::HeadToWorktree.into()
3842 }
3843 },
3844 })
3845 .await?;
3846
3847 Ok(response.diff)
3848 }
3849 }
3850 })
3851 }
3852
3853 pub fn create_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3854 let id = self.id;
3855 self.send_job(
3856 Some(format!("git switch -c {branch_name}").into()),
3857 move |repo, _cx| async move {
3858 match repo {
3859 RepositoryState::Local { backend, .. } => {
3860 backend.create_branch(branch_name).await
3861 }
3862 RepositoryState::Remote { project_id, client } => {
3863 client
3864 .request(proto::GitCreateBranch {
3865 project_id: project_id.0,
3866 repository_id: id.to_proto(),
3867 branch_name,
3868 })
3869 .await?;
3870
3871 Ok(())
3872 }
3873 }
3874 },
3875 )
3876 }
3877
3878 pub fn change_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3879 let id = self.id;
3880 self.send_job(
3881 Some(format!("git switch {branch_name}").into()),
3882 move |repo, _cx| async move {
3883 match repo {
3884 RepositoryState::Local { backend, .. } => {
3885 backend.change_branch(branch_name).await
3886 }
3887 RepositoryState::Remote { project_id, client } => {
3888 client
3889 .request(proto::GitChangeBranch {
3890 project_id: project_id.0,
3891 repository_id: id.to_proto(),
3892 branch_name,
3893 })
3894 .await?;
3895
3896 Ok(())
3897 }
3898 }
3899 },
3900 )
3901 }
3902
3903 pub fn check_for_pushed_commits(&mut self) -> oneshot::Receiver<Result<Vec<SharedString>>> {
3904 let id = self.id;
3905 self.send_job(None, move |repo, _cx| async move {
3906 match repo {
3907 RepositoryState::Local { backend, .. } => backend.check_for_pushed_commit().await,
3908 RepositoryState::Remote { project_id, client } => {
3909 let response = client
3910 .request(proto::CheckForPushedCommits {
3911 project_id: project_id.0,
3912 repository_id: id.to_proto(),
3913 })
3914 .await?;
3915
3916 let branches = response.pushed_to.into_iter().map(Into::into).collect();
3917
3918 Ok(branches)
3919 }
3920 }
3921 })
3922 }
3923
3924 pub fn checkpoint(&mut self) -> oneshot::Receiver<Result<GitRepositoryCheckpoint>> {
3925 self.send_job(None, |repo, _cx| async move {
3926 match repo {
3927 RepositoryState::Local { backend, .. } => backend.checkpoint().await,
3928 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3929 }
3930 })
3931 }
3932
3933 pub fn restore_checkpoint(
3934 &mut self,
3935 checkpoint: GitRepositoryCheckpoint,
3936 ) -> oneshot::Receiver<Result<()>> {
3937 self.send_job(None, move |repo, _cx| async move {
3938 match repo {
3939 RepositoryState::Local { backend, .. } => {
3940 backend.restore_checkpoint(checkpoint).await
3941 }
3942 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3943 }
3944 })
3945 }
3946
3947 pub(crate) fn apply_remote_update(
3948 &mut self,
3949 update: proto::UpdateRepository,
3950 cx: &mut Context<Self>,
3951 ) -> Result<()> {
3952 let conflicted_paths = TreeSet::from_ordered_entries(
3953 update
3954 .current_merge_conflicts
3955 .into_iter()
3956 .map(|path| RepoPath(Path::new(&path).into())),
3957 );
3958 self.snapshot.branch = update.branch_summary.as_ref().map(proto_to_branch);
3959 self.snapshot.head_commit = update
3960 .head_commit_details
3961 .as_ref()
3962 .map(proto_to_commit_details);
3963
3964 self.snapshot.merge.conflicted_paths = conflicted_paths;
3965
3966 let edits = update
3967 .removed_statuses
3968 .into_iter()
3969 .map(|path| sum_tree::Edit::Remove(PathKey(FromProto::from_proto(path))))
3970 .chain(
3971 update
3972 .updated_statuses
3973 .into_iter()
3974 .filter_map(|updated_status| {
3975 Some(sum_tree::Edit::Insert(updated_status.try_into().log_err()?))
3976 }),
3977 )
3978 .collect::<Vec<_>>();
3979 self.snapshot.statuses_by_path.edit(edits, &());
3980 if update.is_last_update {
3981 self.snapshot.scan_id = update.scan_id;
3982 }
3983 cx.emit(RepositoryEvent::Updated { full_scan: true });
3984 Ok(())
3985 }
3986
3987 pub fn compare_checkpoints(
3988 &mut self,
3989 left: GitRepositoryCheckpoint,
3990 right: GitRepositoryCheckpoint,
3991 ) -> oneshot::Receiver<Result<bool>> {
3992 self.send_job(None, move |repo, _cx| async move {
3993 match repo {
3994 RepositoryState::Local { backend, .. } => {
3995 backend.compare_checkpoints(left, right).await
3996 }
3997 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3998 }
3999 })
4000 }
4001
4002 pub fn diff_checkpoints(
4003 &mut self,
4004 base_checkpoint: GitRepositoryCheckpoint,
4005 target_checkpoint: GitRepositoryCheckpoint,
4006 ) -> oneshot::Receiver<Result<String>> {
4007 self.send_job(None, move |repo, _cx| async move {
4008 match repo {
4009 RepositoryState::Local { backend, .. } => {
4010 backend
4011 .diff_checkpoints(base_checkpoint, target_checkpoint)
4012 .await
4013 }
4014 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
4015 }
4016 })
4017 }
4018
4019 fn schedule_scan(
4020 &mut self,
4021 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4022 cx: &mut Context<Self>,
4023 ) {
4024 let this = cx.weak_entity();
4025 let _ = self.send_keyed_job(
4026 Some(GitJobKey::ReloadGitState),
4027 None,
4028 |state, mut cx| async move {
4029 let Some(this) = this.upgrade() else {
4030 return Ok(());
4031 };
4032 let RepositoryState::Local { backend, .. } = state else {
4033 bail!("not a local repository")
4034 };
4035 let (snapshot, events) = this
4036 .update(&mut cx, |this, _| {
4037 compute_snapshot(
4038 this.id,
4039 this.work_directory_abs_path.clone(),
4040 this.snapshot.clone(),
4041 backend.clone(),
4042 )
4043 })?
4044 .await?;
4045 this.update(&mut cx, |this, cx| {
4046 this.snapshot = snapshot.clone();
4047 for event in events {
4048 cx.emit(event);
4049 }
4050 })?;
4051 if let Some(updates_tx) = updates_tx {
4052 updates_tx
4053 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
4054 .ok();
4055 }
4056 Ok(())
4057 },
4058 );
4059 }
4060
4061 fn spawn_local_git_worker(
4062 work_directory_abs_path: Arc<Path>,
4063 dot_git_abs_path: Arc<Path>,
4064 _repository_dir_abs_path: Arc<Path>,
4065 _common_dir_abs_path: Arc<Path>,
4066 project_environment: WeakEntity<ProjectEnvironment>,
4067 fs: Arc<dyn Fs>,
4068 cx: &mut Context<Self>,
4069 ) -> mpsc::UnboundedSender<GitJob> {
4070 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4071
4072 cx.spawn(async move |_, cx| {
4073 let environment = project_environment
4074 .upgrade()
4075 .ok_or_else(|| anyhow!("missing project environment"))?
4076 .update(cx, |project_environment, cx| {
4077 project_environment.get_directory_environment(work_directory_abs_path.clone(), cx)
4078 })?
4079 .await
4080 .unwrap_or_else(|| {
4081 log::error!("failed to get working directory environment for repository {work_directory_abs_path:?}");
4082 HashMap::default()
4083 });
4084 let backend = cx
4085 .background_spawn(async move {
4086 fs.open_repo(&dot_git_abs_path)
4087 .ok_or_else(|| anyhow!("failed to build repository"))
4088 })
4089 .await?;
4090
4091 if let Some(git_hosting_provider_registry) =
4092 cx.update(|cx| GitHostingProviderRegistry::try_global(cx))?
4093 {
4094 git_hosting_providers::register_additional_providers(
4095 git_hosting_provider_registry,
4096 backend.clone(),
4097 );
4098 }
4099
4100 let state = RepositoryState::Local {
4101 backend,
4102 environment: Arc::new(environment),
4103 };
4104 let mut jobs = VecDeque::new();
4105 loop {
4106 while let Ok(Some(next_job)) = job_rx.try_next() {
4107 jobs.push_back(next_job);
4108 }
4109
4110 if let Some(job) = jobs.pop_front() {
4111 if let Some(current_key) = &job.key {
4112 if jobs
4113 .iter()
4114 .any(|other_job| other_job.key.as_ref() == Some(current_key))
4115 {
4116 continue;
4117 }
4118 }
4119 (job.job)(state.clone(), cx).await;
4120 } else if let Some(job) = job_rx.next().await {
4121 jobs.push_back(job);
4122 } else {
4123 break;
4124 }
4125 }
4126 anyhow::Ok(())
4127 })
4128 .detach_and_log_err(cx);
4129
4130 job_tx
4131 }
4132
4133 fn spawn_remote_git_worker(
4134 project_id: ProjectId,
4135 client: AnyProtoClient,
4136 cx: &mut Context<Self>,
4137 ) -> mpsc::UnboundedSender<GitJob> {
4138 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4139
4140 cx.spawn(async move |_, cx| {
4141 let state = RepositoryState::Remote { project_id, client };
4142 let mut jobs = VecDeque::new();
4143 loop {
4144 while let Ok(Some(next_job)) = job_rx.try_next() {
4145 jobs.push_back(next_job);
4146 }
4147
4148 if let Some(job) = jobs.pop_front() {
4149 if let Some(current_key) = &job.key {
4150 if jobs
4151 .iter()
4152 .any(|other_job| other_job.key.as_ref() == Some(current_key))
4153 {
4154 continue;
4155 }
4156 }
4157 (job.job)(state.clone(), cx).await;
4158 } else if let Some(job) = job_rx.next().await {
4159 jobs.push_back(job);
4160 } else {
4161 break;
4162 }
4163 }
4164 anyhow::Ok(())
4165 })
4166 .detach_and_log_err(cx);
4167
4168 job_tx
4169 }
4170
4171 fn load_staged_text(
4172 &mut self,
4173 buffer_id: BufferId,
4174 repo_path: RepoPath,
4175 cx: &App,
4176 ) -> Task<Result<Option<String>>> {
4177 let rx = self.send_job(None, move |state, _| async move {
4178 match state {
4179 RepositoryState::Local { backend, .. } => {
4180 anyhow::Ok(backend.load_index_text(repo_path).await)
4181 }
4182 RepositoryState::Remote { project_id, client } => {
4183 let response = client
4184 .request(proto::OpenUnstagedDiff {
4185 project_id: project_id.to_proto(),
4186 buffer_id: buffer_id.to_proto(),
4187 })
4188 .await?;
4189 Ok(response.staged_text)
4190 }
4191 }
4192 });
4193 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4194 }
4195
4196 fn load_committed_text(
4197 &mut self,
4198 buffer_id: BufferId,
4199 repo_path: RepoPath,
4200 cx: &App,
4201 ) -> Task<Result<DiffBasesChange>> {
4202 let rx = self.send_job(None, move |state, _| async move {
4203 match state {
4204 RepositoryState::Local { backend, .. } => {
4205 let committed_text = backend.load_committed_text(repo_path.clone()).await;
4206 let staged_text = backend.load_index_text(repo_path).await;
4207 let diff_bases_change = if committed_text == staged_text {
4208 DiffBasesChange::SetBoth(committed_text)
4209 } else {
4210 DiffBasesChange::SetEach {
4211 index: staged_text,
4212 head: committed_text,
4213 }
4214 };
4215 anyhow::Ok(diff_bases_change)
4216 }
4217 RepositoryState::Remote { project_id, client } => {
4218 use proto::open_uncommitted_diff_response::Mode;
4219
4220 let response = client
4221 .request(proto::OpenUncommittedDiff {
4222 project_id: project_id.to_proto(),
4223 buffer_id: buffer_id.to_proto(),
4224 })
4225 .await?;
4226 let mode =
4227 Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
4228 let bases = match mode {
4229 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
4230 Mode::IndexAndHead => DiffBasesChange::SetEach {
4231 head: response.committed_text,
4232 index: response.staged_text,
4233 },
4234 };
4235 Ok(bases)
4236 }
4237 }
4238 });
4239
4240 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4241 }
4242
4243 fn paths_changed(
4244 &mut self,
4245 paths: Vec<RepoPath>,
4246 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4247 cx: &mut Context<Self>,
4248 ) {
4249 self.paths_needing_status_update.extend(paths);
4250
4251 let this = cx.weak_entity();
4252 let _ = self.send_keyed_job(
4253 Some(GitJobKey::RefreshStatuses),
4254 None,
4255 |state, mut cx| async move {
4256 let (prev_snapshot, mut changed_paths) = this.update(&mut cx, |this, _| {
4257 (
4258 this.snapshot.clone(),
4259 mem::take(&mut this.paths_needing_status_update),
4260 )
4261 })?;
4262 let RepositoryState::Local { backend, .. } = state else {
4263 bail!("not a local repository")
4264 };
4265
4266 let paths = changed_paths.iter().cloned().collect::<Vec<_>>();
4267 let statuses = backend.status(&paths).await?;
4268
4269 let changed_path_statuses = cx
4270 .background_spawn(async move {
4271 let mut changed_path_statuses = Vec::new();
4272 let prev_statuses = prev_snapshot.statuses_by_path.clone();
4273 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4274
4275 for (repo_path, status) in &*statuses.entries {
4276 changed_paths.remove(repo_path);
4277 if cursor.seek_forward(&PathTarget::Path(repo_path), Bias::Left, &()) {
4278 if cursor.item().is_some_and(|entry| entry.status == *status) {
4279 continue;
4280 }
4281 }
4282
4283 changed_path_statuses.push(Edit::Insert(StatusEntry {
4284 repo_path: repo_path.clone(),
4285 status: *status,
4286 }));
4287 }
4288 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4289 for path in changed_paths.iter() {
4290 if cursor.seek_forward(&PathTarget::Path(&path), Bias::Left, &()) {
4291 changed_path_statuses.push(Edit::Remove(PathKey(path.0.clone())));
4292 }
4293 }
4294 changed_path_statuses
4295 })
4296 .await;
4297
4298 this.update(&mut cx, |this, cx| {
4299 if !changed_path_statuses.is_empty() {
4300 this.snapshot
4301 .statuses_by_path
4302 .edit(changed_path_statuses, &());
4303 this.snapshot.scan_id += 1;
4304 if let Some(updates_tx) = updates_tx {
4305 updates_tx
4306 .unbounded_send(DownstreamUpdate::UpdateRepository(
4307 this.snapshot.clone(),
4308 ))
4309 .ok();
4310 }
4311 }
4312 cx.emit(RepositoryEvent::Updated { full_scan: false });
4313 })
4314 },
4315 );
4316 }
4317
4318 /// currently running git command and when it started
4319 pub fn current_job(&self) -> Option<JobInfo> {
4320 self.active_jobs.values().next().cloned()
4321 }
4322
4323 pub fn barrier(&mut self) -> oneshot::Receiver<()> {
4324 self.send_job(None, |_, _| async {})
4325 }
4326}
4327
4328fn get_permalink_in_rust_registry_src(
4329 provider_registry: Arc<GitHostingProviderRegistry>,
4330 path: PathBuf,
4331 selection: Range<u32>,
4332) -> Result<url::Url> {
4333 #[derive(Deserialize)]
4334 struct CargoVcsGit {
4335 sha1: String,
4336 }
4337
4338 #[derive(Deserialize)]
4339 struct CargoVcsInfo {
4340 git: CargoVcsGit,
4341 path_in_vcs: String,
4342 }
4343
4344 #[derive(Deserialize)]
4345 struct CargoPackage {
4346 repository: String,
4347 }
4348
4349 #[derive(Deserialize)]
4350 struct CargoToml {
4351 package: CargoPackage,
4352 }
4353
4354 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
4355 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
4356 Some((dir, json))
4357 }) else {
4358 bail!("No .cargo_vcs_info.json found in parent directories")
4359 };
4360 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
4361 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
4362 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
4363 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
4364 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
4365 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
4366 let permalink = provider.build_permalink(
4367 remote,
4368 BuildPermalinkParams {
4369 sha: &cargo_vcs_info.git.sha1,
4370 path: &path.to_string_lossy(),
4371 selection: Some(selection),
4372 },
4373 );
4374 Ok(permalink)
4375}
4376
4377fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
4378 let Some(blame) = blame else {
4379 return proto::BlameBufferResponse {
4380 blame_response: None,
4381 };
4382 };
4383
4384 let entries = blame
4385 .entries
4386 .into_iter()
4387 .map(|entry| proto::BlameEntry {
4388 sha: entry.sha.as_bytes().into(),
4389 start_line: entry.range.start,
4390 end_line: entry.range.end,
4391 original_line_number: entry.original_line_number,
4392 author: entry.author.clone(),
4393 author_mail: entry.author_mail.clone(),
4394 author_time: entry.author_time,
4395 author_tz: entry.author_tz.clone(),
4396 committer: entry.committer_name.clone(),
4397 committer_mail: entry.committer_email.clone(),
4398 committer_time: entry.committer_time,
4399 committer_tz: entry.committer_tz.clone(),
4400 summary: entry.summary.clone(),
4401 previous: entry.previous.clone(),
4402 filename: entry.filename.clone(),
4403 })
4404 .collect::<Vec<_>>();
4405
4406 let messages = blame
4407 .messages
4408 .into_iter()
4409 .map(|(oid, message)| proto::CommitMessage {
4410 oid: oid.as_bytes().into(),
4411 message,
4412 })
4413 .collect::<Vec<_>>();
4414
4415 proto::BlameBufferResponse {
4416 blame_response: Some(proto::blame_buffer_response::BlameResponse {
4417 entries,
4418 messages,
4419 remote_url: blame.remote_url,
4420 }),
4421 }
4422}
4423
4424fn deserialize_blame_buffer_response(
4425 response: proto::BlameBufferResponse,
4426) -> Option<git::blame::Blame> {
4427 let response = response.blame_response?;
4428 let entries = response
4429 .entries
4430 .into_iter()
4431 .filter_map(|entry| {
4432 Some(git::blame::BlameEntry {
4433 sha: git::Oid::from_bytes(&entry.sha).ok()?,
4434 range: entry.start_line..entry.end_line,
4435 original_line_number: entry.original_line_number,
4436 committer_name: entry.committer,
4437 committer_time: entry.committer_time,
4438 committer_tz: entry.committer_tz,
4439 committer_email: entry.committer_mail,
4440 author: entry.author,
4441 author_mail: entry.author_mail,
4442 author_time: entry.author_time,
4443 author_tz: entry.author_tz,
4444 summary: entry.summary,
4445 previous: entry.previous,
4446 filename: entry.filename,
4447 })
4448 })
4449 .collect::<Vec<_>>();
4450
4451 let messages = response
4452 .messages
4453 .into_iter()
4454 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
4455 .collect::<HashMap<_, _>>();
4456
4457 Some(Blame {
4458 entries,
4459 messages,
4460 remote_url: response.remote_url,
4461 })
4462}
4463
4464fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
4465 proto::Branch {
4466 is_head: branch.is_head,
4467 name: branch.name.to_string(),
4468 unix_timestamp: branch
4469 .most_recent_commit
4470 .as_ref()
4471 .map(|commit| commit.commit_timestamp as u64),
4472 upstream: branch.upstream.as_ref().map(|upstream| proto::GitUpstream {
4473 ref_name: upstream.ref_name.to_string(),
4474 tracking: upstream
4475 .tracking
4476 .status()
4477 .map(|upstream| proto::UpstreamTracking {
4478 ahead: upstream.ahead as u64,
4479 behind: upstream.behind as u64,
4480 }),
4481 }),
4482 most_recent_commit: branch
4483 .most_recent_commit
4484 .as_ref()
4485 .map(|commit| proto::CommitSummary {
4486 sha: commit.sha.to_string(),
4487 subject: commit.subject.to_string(),
4488 commit_timestamp: commit.commit_timestamp,
4489 }),
4490 }
4491}
4492
4493fn proto_to_branch(proto: &proto::Branch) -> git::repository::Branch {
4494 git::repository::Branch {
4495 is_head: proto.is_head,
4496 name: proto.name.clone().into(),
4497 upstream: proto
4498 .upstream
4499 .as_ref()
4500 .map(|upstream| git::repository::Upstream {
4501 ref_name: upstream.ref_name.to_string().into(),
4502 tracking: upstream
4503 .tracking
4504 .as_ref()
4505 .map(|tracking| {
4506 git::repository::UpstreamTracking::Tracked(UpstreamTrackingStatus {
4507 ahead: tracking.ahead as u32,
4508 behind: tracking.behind as u32,
4509 })
4510 })
4511 .unwrap_or(git::repository::UpstreamTracking::Gone),
4512 }),
4513 most_recent_commit: proto.most_recent_commit.as_ref().map(|commit| {
4514 git::repository::CommitSummary {
4515 sha: commit.sha.to_string().into(),
4516 subject: commit.subject.to_string().into(),
4517 commit_timestamp: commit.commit_timestamp,
4518 has_parent: true,
4519 }
4520 }),
4521 }
4522}
4523
4524fn commit_details_to_proto(commit: &CommitDetails) -> proto::GitCommitDetails {
4525 proto::GitCommitDetails {
4526 sha: commit.sha.to_string(),
4527 message: commit.message.to_string(),
4528 commit_timestamp: commit.commit_timestamp,
4529 author_email: commit.author_email.to_string(),
4530 author_name: commit.author_name.to_string(),
4531 }
4532}
4533
4534fn proto_to_commit_details(proto: &proto::GitCommitDetails) -> CommitDetails {
4535 CommitDetails {
4536 sha: proto.sha.clone().into(),
4537 message: proto.message.clone().into(),
4538 commit_timestamp: proto.commit_timestamp,
4539 author_email: proto.author_email.clone().into(),
4540 author_name: proto.author_name.clone().into(),
4541 }
4542}
4543
4544async fn compute_snapshot(
4545 id: RepositoryId,
4546 work_directory_abs_path: Arc<Path>,
4547 prev_snapshot: RepositorySnapshot,
4548 backend: Arc<dyn GitRepository>,
4549) -> Result<(RepositorySnapshot, Vec<RepositoryEvent>)> {
4550 let mut events = Vec::new();
4551 let branches = backend.branches().await?;
4552 let branch = branches.into_iter().find(|branch| branch.is_head);
4553 let statuses = backend.status(&[WORK_DIRECTORY_REPO_PATH.clone()]).await?;
4554 let statuses_by_path = SumTree::from_iter(
4555 statuses
4556 .entries
4557 .iter()
4558 .map(|(repo_path, status)| StatusEntry {
4559 repo_path: repo_path.clone(),
4560 status: *status,
4561 }),
4562 &(),
4563 );
4564 let (merge_details, merge_heads_changed) =
4565 MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?;
4566
4567 if merge_heads_changed
4568 || branch != prev_snapshot.branch
4569 || statuses_by_path != prev_snapshot.statuses_by_path
4570 {
4571 events.push(RepositoryEvent::Updated { full_scan: true });
4572 }
4573
4574 // Cache merge conflict paths so they don't change from staging/unstaging,
4575 // until the merge heads change (at commit time, etc.).
4576 if merge_heads_changed {
4577 events.push(RepositoryEvent::MergeHeadsChanged);
4578 }
4579
4580 // Useful when branch is None in detached head state
4581 let head_commit = match backend.head_sha() {
4582 Some(head_sha) => backend.show(head_sha).await.log_err(),
4583 None => None,
4584 };
4585
4586 let snapshot = RepositorySnapshot {
4587 id,
4588 statuses_by_path,
4589 work_directory_abs_path,
4590 scan_id: prev_snapshot.scan_id + 1,
4591 branch,
4592 head_commit,
4593 merge: merge_details,
4594 };
4595
4596 Ok((snapshot, events))
4597}
4598
4599fn status_from_proto(
4600 simple_status: i32,
4601 status: Option<proto::GitFileStatus>,
4602) -> anyhow::Result<FileStatus> {
4603 use proto::git_file_status::Variant;
4604
4605 let Some(variant) = status.and_then(|status| status.variant) else {
4606 let code = proto::GitStatus::from_i32(simple_status)
4607 .ok_or_else(|| anyhow!("Invalid git status code: {simple_status}"))?;
4608 let result = match code {
4609 proto::GitStatus::Added => TrackedStatus {
4610 worktree_status: StatusCode::Added,
4611 index_status: StatusCode::Unmodified,
4612 }
4613 .into(),
4614 proto::GitStatus::Modified => TrackedStatus {
4615 worktree_status: StatusCode::Modified,
4616 index_status: StatusCode::Unmodified,
4617 }
4618 .into(),
4619 proto::GitStatus::Conflict => UnmergedStatus {
4620 first_head: UnmergedStatusCode::Updated,
4621 second_head: UnmergedStatusCode::Updated,
4622 }
4623 .into(),
4624 proto::GitStatus::Deleted => TrackedStatus {
4625 worktree_status: StatusCode::Deleted,
4626 index_status: StatusCode::Unmodified,
4627 }
4628 .into(),
4629 _ => return Err(anyhow!("Invalid code for simple status: {simple_status}")),
4630 };
4631 return Ok(result);
4632 };
4633
4634 let result = match variant {
4635 Variant::Untracked(_) => FileStatus::Untracked,
4636 Variant::Ignored(_) => FileStatus::Ignored,
4637 Variant::Unmerged(unmerged) => {
4638 let [first_head, second_head] =
4639 [unmerged.first_head, unmerged.second_head].map(|head| {
4640 let code = proto::GitStatus::from_i32(head)
4641 .ok_or_else(|| anyhow!("Invalid git status code: {head}"))?;
4642 let result = match code {
4643 proto::GitStatus::Added => UnmergedStatusCode::Added,
4644 proto::GitStatus::Updated => UnmergedStatusCode::Updated,
4645 proto::GitStatus::Deleted => UnmergedStatusCode::Deleted,
4646 _ => return Err(anyhow!("Invalid code for unmerged status: {code:?}")),
4647 };
4648 Ok(result)
4649 });
4650 let [first_head, second_head] = [first_head?, second_head?];
4651 UnmergedStatus {
4652 first_head,
4653 second_head,
4654 }
4655 .into()
4656 }
4657 Variant::Tracked(tracked) => {
4658 let [index_status, worktree_status] = [tracked.index_status, tracked.worktree_status]
4659 .map(|status| {
4660 let code = proto::GitStatus::from_i32(status)
4661 .ok_or_else(|| anyhow!("Invalid git status code: {status}"))?;
4662 let result = match code {
4663 proto::GitStatus::Modified => StatusCode::Modified,
4664 proto::GitStatus::TypeChanged => StatusCode::TypeChanged,
4665 proto::GitStatus::Added => StatusCode::Added,
4666 proto::GitStatus::Deleted => StatusCode::Deleted,
4667 proto::GitStatus::Renamed => StatusCode::Renamed,
4668 proto::GitStatus::Copied => StatusCode::Copied,
4669 proto::GitStatus::Unmodified => StatusCode::Unmodified,
4670 _ => return Err(anyhow!("Invalid code for tracked status: {code:?}")),
4671 };
4672 Ok(result)
4673 });
4674 let [index_status, worktree_status] = [index_status?, worktree_status?];
4675 TrackedStatus {
4676 index_status,
4677 worktree_status,
4678 }
4679 .into()
4680 }
4681 };
4682 Ok(result)
4683}
4684
4685fn status_to_proto(status: FileStatus) -> proto::GitFileStatus {
4686 use proto::git_file_status::{Tracked, Unmerged, Variant};
4687
4688 let variant = match status {
4689 FileStatus::Untracked => Variant::Untracked(Default::default()),
4690 FileStatus::Ignored => Variant::Ignored(Default::default()),
4691 FileStatus::Unmerged(UnmergedStatus {
4692 first_head,
4693 second_head,
4694 }) => Variant::Unmerged(Unmerged {
4695 first_head: unmerged_status_to_proto(first_head),
4696 second_head: unmerged_status_to_proto(second_head),
4697 }),
4698 FileStatus::Tracked(TrackedStatus {
4699 index_status,
4700 worktree_status,
4701 }) => Variant::Tracked(Tracked {
4702 index_status: tracked_status_to_proto(index_status),
4703 worktree_status: tracked_status_to_proto(worktree_status),
4704 }),
4705 };
4706 proto::GitFileStatus {
4707 variant: Some(variant),
4708 }
4709}
4710
4711fn unmerged_status_to_proto(code: UnmergedStatusCode) -> i32 {
4712 match code {
4713 UnmergedStatusCode::Added => proto::GitStatus::Added as _,
4714 UnmergedStatusCode::Deleted => proto::GitStatus::Deleted as _,
4715 UnmergedStatusCode::Updated => proto::GitStatus::Updated as _,
4716 }
4717}
4718
4719fn tracked_status_to_proto(code: StatusCode) -> i32 {
4720 match code {
4721 StatusCode::Added => proto::GitStatus::Added as _,
4722 StatusCode::Deleted => proto::GitStatus::Deleted as _,
4723 StatusCode::Modified => proto::GitStatus::Modified as _,
4724 StatusCode::Renamed => proto::GitStatus::Renamed as _,
4725 StatusCode::TypeChanged => proto::GitStatus::TypeChanged as _,
4726 StatusCode::Copied => proto::GitStatus::Copied as _,
4727 StatusCode::Unmodified => proto::GitStatus::Unmodified as _,
4728 }
4729}