1mod conflict_set;
2pub mod git_traversal;
3
4use crate::{
5 ProjectEnvironment, ProjectItem, ProjectPath,
6 buffer_store::{BufferStore, BufferStoreEvent},
7 worktree_store::{WorktreeStore, WorktreeStoreEvent},
8};
9use anyhow::{Context as _, Result, anyhow, bail};
10use askpass::AskPassDelegate;
11use buffer_diff::{BufferDiff, BufferDiffEvent};
12use client::ProjectId;
13use collections::HashMap;
14pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, ConflictSetUpdate};
15use fs::Fs;
16use futures::{
17 FutureExt, StreamExt as _,
18 channel::{mpsc, oneshot},
19 future::{self, Shared},
20};
21use git::{
22 BuildPermalinkParams, GitHostingProviderRegistry, WORK_DIRECTORY_REPO_PATH,
23 blame::Blame,
24 parse_git_remote_url,
25 repository::{
26 Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, DiffType, GitRepository,
27 GitRepositoryCheckpoint, PushOptions, Remote, RemoteCommandOutput, RepoPath, ResetMode,
28 UpstreamTrackingStatus,
29 },
30 status::{
31 FileStatus, GitSummary, StatusCode, TrackedStatus, UnmergedStatus, UnmergedStatusCode,
32 },
33};
34use gpui::{
35 App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
36 WeakEntity,
37};
38use language::{
39 Buffer, BufferEvent, Language, LanguageRegistry,
40 proto::{deserialize_version, serialize_version},
41};
42use parking_lot::Mutex;
43use postage::stream::Stream as _;
44use rpc::{
45 AnyProtoClient, TypedEnvelope,
46 proto::{self, FromProto, SSH_PROJECT_ID, ToProto, git_reset, split_repository_update},
47};
48use serde::Deserialize;
49use std::{
50 cmp::Ordering,
51 collections::{BTreeSet, VecDeque},
52 future::Future,
53 mem,
54 ops::Range,
55 path::{Path, PathBuf},
56 sync::{
57 Arc,
58 atomic::{self, AtomicU64},
59 },
60 time::Instant,
61};
62use sum_tree::{Edit, SumTree, TreeSet};
63use text::{Bias, BufferId};
64use util::{ResultExt, debug_panic, post_inc};
65use worktree::{
66 File, PathKey, PathProgress, PathSummary, PathTarget, UpdatedGitRepositoriesSet,
67 UpdatedGitRepository, Worktree,
68};
69
70pub struct GitStore {
71 state: GitStoreState,
72 buffer_store: Entity<BufferStore>,
73 worktree_store: Entity<WorktreeStore>,
74 repositories: HashMap<RepositoryId, Entity<Repository>>,
75 active_repo_id: Option<RepositoryId>,
76 #[allow(clippy::type_complexity)]
77 loading_diffs:
78 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
79 diffs: HashMap<BufferId, Entity<BufferGitState>>,
80 shared_diffs: HashMap<proto::PeerId, HashMap<BufferId, SharedDiffs>>,
81 _subscriptions: Vec<Subscription>,
82}
83
84#[derive(Default)]
85struct SharedDiffs {
86 unstaged: Option<Entity<BufferDiff>>,
87 uncommitted: Option<Entity<BufferDiff>>,
88}
89
90struct BufferGitState {
91 unstaged_diff: Option<WeakEntity<BufferDiff>>,
92 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
93 conflict_set: Option<WeakEntity<ConflictSet>>,
94 recalculate_diff_task: Option<Task<Result<()>>>,
95 reparse_conflict_markers_task: Option<Task<Result<()>>>,
96 language: Option<Arc<Language>>,
97 language_registry: Option<Arc<LanguageRegistry>>,
98 conflict_updated_futures: Vec<oneshot::Sender<()>>,
99 recalculating_tx: postage::watch::Sender<bool>,
100
101 /// These operation counts are used to ensure that head and index text
102 /// values read from the git repository are up-to-date with any hunk staging
103 /// operations that have been performed on the BufferDiff.
104 ///
105 /// The operation count is incremented immediately when the user initiates a
106 /// hunk stage/unstage operation. Then, upon finishing writing the new index
107 /// text do disk, the `operation count as of write` is updated to reflect
108 /// the operation count that prompted the write.
109 hunk_staging_operation_count: usize,
110 hunk_staging_operation_count_as_of_write: usize,
111
112 head_text: Option<Arc<String>>,
113 index_text: Option<Arc<String>>,
114 head_changed: bool,
115 index_changed: bool,
116 language_changed: bool,
117}
118
119#[derive(Clone, Debug)]
120enum DiffBasesChange {
121 SetIndex(Option<String>),
122 SetHead(Option<String>),
123 SetEach {
124 index: Option<String>,
125 head: Option<String>,
126 },
127 SetBoth(Option<String>),
128}
129
130#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
131enum DiffKind {
132 Unstaged,
133 Uncommitted,
134}
135
136enum GitStoreState {
137 Local {
138 next_repository_id: Arc<AtomicU64>,
139 downstream: Option<LocalDownstreamState>,
140 project_environment: Entity<ProjectEnvironment>,
141 fs: Arc<dyn Fs>,
142 },
143 Ssh {
144 upstream_client: AnyProtoClient,
145 upstream_project_id: ProjectId,
146 downstream: Option<(AnyProtoClient, ProjectId)>,
147 },
148 Remote {
149 upstream_client: AnyProtoClient,
150 upstream_project_id: ProjectId,
151 },
152}
153
154enum DownstreamUpdate {
155 UpdateRepository(RepositorySnapshot),
156 RemoveRepository(RepositoryId),
157}
158
159struct LocalDownstreamState {
160 client: AnyProtoClient,
161 project_id: ProjectId,
162 updates_tx: mpsc::UnboundedSender<DownstreamUpdate>,
163 _task: Task<Result<()>>,
164}
165
166#[derive(Clone)]
167pub struct GitStoreCheckpoint {
168 checkpoints_by_work_dir_abs_path: HashMap<Arc<Path>, GitRepositoryCheckpoint>,
169}
170
171#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct StatusEntry {
173 pub repo_path: RepoPath,
174 pub status: FileStatus,
175}
176
177impl StatusEntry {
178 fn to_proto(&self) -> proto::StatusEntry {
179 let simple_status = match self.status {
180 FileStatus::Ignored | FileStatus::Untracked => proto::GitStatus::Added as i32,
181 FileStatus::Unmerged { .. } => proto::GitStatus::Conflict as i32,
182 FileStatus::Tracked(TrackedStatus {
183 index_status,
184 worktree_status,
185 }) => tracked_status_to_proto(if worktree_status != StatusCode::Unmodified {
186 worktree_status
187 } else {
188 index_status
189 }),
190 };
191
192 proto::StatusEntry {
193 repo_path: self.repo_path.as_ref().to_proto(),
194 simple_status,
195 status: Some(status_to_proto(self.status)),
196 }
197 }
198}
199
200impl TryFrom<proto::StatusEntry> for StatusEntry {
201 type Error = anyhow::Error;
202
203 fn try_from(value: proto::StatusEntry) -> Result<Self, Self::Error> {
204 let repo_path = RepoPath(Arc::<Path>::from_proto(value.repo_path));
205 let status = status_from_proto(value.simple_status, value.status)?;
206 Ok(Self { repo_path, status })
207 }
208}
209
210impl sum_tree::Item for StatusEntry {
211 type Summary = PathSummary<GitSummary>;
212
213 fn summary(&self, _: &<Self::Summary as sum_tree::Summary>::Context) -> Self::Summary {
214 PathSummary {
215 max_path: self.repo_path.0.clone(),
216 item_summary: self.status.summary(),
217 }
218 }
219}
220
221impl sum_tree::KeyedItem for StatusEntry {
222 type Key = PathKey;
223
224 fn key(&self) -> Self::Key {
225 PathKey(self.repo_path.0.clone())
226 }
227}
228
229#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
230pub struct RepositoryId(pub u64);
231
232#[derive(Clone, Debug, Default, PartialEq, Eq)]
233pub struct MergeDetails {
234 pub conflicted_paths: TreeSet<RepoPath>,
235 pub message: Option<SharedString>,
236 pub heads: Vec<Option<SharedString>>,
237}
238
239#[derive(Clone, Debug, PartialEq, Eq)]
240pub struct RepositorySnapshot {
241 pub id: RepositoryId,
242 pub statuses_by_path: SumTree<StatusEntry>,
243 pub work_directory_abs_path: Arc<Path>,
244 pub branch: Option<Branch>,
245 pub head_commit: Option<CommitDetails>,
246 pub scan_id: u64,
247 pub merge: MergeDetails,
248}
249
250type JobId = u64;
251
252#[derive(Clone, Debug, PartialEq, Eq)]
253pub struct JobInfo {
254 pub start: Instant,
255 pub message: SharedString,
256}
257
258pub struct Repository {
259 this: WeakEntity<Self>,
260 snapshot: RepositorySnapshot,
261 commit_message_buffer: Option<Entity<Buffer>>,
262 git_store: WeakEntity<GitStore>,
263 // For a local repository, holds paths that have had worktree events since the last status scan completed,
264 // and that should be examined during the next status scan.
265 paths_needing_status_update: BTreeSet<RepoPath>,
266 job_sender: mpsc::UnboundedSender<GitJob>,
267 active_jobs: HashMap<JobId, JobInfo>,
268 job_id: JobId,
269 askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
270 latest_askpass_id: u64,
271}
272
273impl std::ops::Deref for Repository {
274 type Target = RepositorySnapshot;
275
276 fn deref(&self) -> &Self::Target {
277 &self.snapshot
278 }
279}
280
281#[derive(Clone)]
282pub enum RepositoryState {
283 Local {
284 backend: Arc<dyn GitRepository>,
285 environment: Arc<HashMap<String, String>>,
286 },
287 Remote {
288 project_id: ProjectId,
289 client: AnyProtoClient,
290 },
291}
292
293#[derive(Clone, Debug)]
294pub enum RepositoryEvent {
295 Updated { full_scan: bool },
296 MergeHeadsChanged,
297}
298
299#[derive(Clone, Debug)]
300pub struct JobsUpdated;
301
302#[derive(Debug)]
303pub enum GitStoreEvent {
304 ActiveRepositoryChanged(Option<RepositoryId>),
305 RepositoryUpdated(RepositoryId, RepositoryEvent, bool),
306 RepositoryAdded(RepositoryId),
307 RepositoryRemoved(RepositoryId),
308 IndexWriteError(anyhow::Error),
309 JobsUpdated,
310 ConflictsUpdated,
311}
312
313impl EventEmitter<RepositoryEvent> for Repository {}
314impl EventEmitter<JobsUpdated> for Repository {}
315impl EventEmitter<GitStoreEvent> for GitStore {}
316
317pub struct GitJob {
318 job: Box<dyn FnOnce(RepositoryState, &mut AsyncApp) -> Task<()>>,
319 key: Option<GitJobKey>,
320}
321
322#[derive(PartialEq, Eq)]
323enum GitJobKey {
324 WriteIndex(RepoPath),
325 ReloadBufferDiffBases,
326 RefreshStatuses,
327 ReloadGitState,
328}
329
330impl GitStore {
331 pub fn local(
332 worktree_store: &Entity<WorktreeStore>,
333 buffer_store: Entity<BufferStore>,
334 environment: Entity<ProjectEnvironment>,
335 fs: Arc<dyn Fs>,
336 cx: &mut Context<Self>,
337 ) -> Self {
338 Self::new(
339 worktree_store.clone(),
340 buffer_store,
341 GitStoreState::Local {
342 next_repository_id: Arc::new(AtomicU64::new(1)),
343 downstream: None,
344 project_environment: environment,
345 fs,
346 },
347 cx,
348 )
349 }
350
351 pub fn remote(
352 worktree_store: &Entity<WorktreeStore>,
353 buffer_store: Entity<BufferStore>,
354 upstream_client: AnyProtoClient,
355 project_id: ProjectId,
356 cx: &mut Context<Self>,
357 ) -> Self {
358 Self::new(
359 worktree_store.clone(),
360 buffer_store,
361 GitStoreState::Remote {
362 upstream_client,
363 upstream_project_id: project_id,
364 },
365 cx,
366 )
367 }
368
369 pub fn ssh(
370 worktree_store: &Entity<WorktreeStore>,
371 buffer_store: Entity<BufferStore>,
372 upstream_client: AnyProtoClient,
373 cx: &mut Context<Self>,
374 ) -> Self {
375 Self::new(
376 worktree_store.clone(),
377 buffer_store,
378 GitStoreState::Ssh {
379 upstream_client,
380 upstream_project_id: ProjectId(SSH_PROJECT_ID),
381 downstream: None,
382 },
383 cx,
384 )
385 }
386
387 fn new(
388 worktree_store: Entity<WorktreeStore>,
389 buffer_store: Entity<BufferStore>,
390 state: GitStoreState,
391 cx: &mut Context<Self>,
392 ) -> Self {
393 let _subscriptions = vec![
394 cx.subscribe(&worktree_store, Self::on_worktree_store_event),
395 cx.subscribe(&buffer_store, Self::on_buffer_store_event),
396 ];
397
398 GitStore {
399 state,
400 buffer_store,
401 worktree_store,
402 repositories: HashMap::default(),
403 active_repo_id: None,
404 _subscriptions,
405 loading_diffs: HashMap::default(),
406 shared_diffs: HashMap::default(),
407 diffs: HashMap::default(),
408 }
409 }
410
411 pub fn init(client: &AnyProtoClient) {
412 client.add_entity_request_handler(Self::handle_get_remotes);
413 client.add_entity_request_handler(Self::handle_get_branches);
414 client.add_entity_request_handler(Self::handle_change_branch);
415 client.add_entity_request_handler(Self::handle_create_branch);
416 client.add_entity_request_handler(Self::handle_git_init);
417 client.add_entity_request_handler(Self::handle_push);
418 client.add_entity_request_handler(Self::handle_pull);
419 client.add_entity_request_handler(Self::handle_fetch);
420 client.add_entity_request_handler(Self::handle_stage);
421 client.add_entity_request_handler(Self::handle_unstage);
422 client.add_entity_request_handler(Self::handle_commit);
423 client.add_entity_request_handler(Self::handle_reset);
424 client.add_entity_request_handler(Self::handle_show);
425 client.add_entity_request_handler(Self::handle_load_commit_diff);
426 client.add_entity_request_handler(Self::handle_checkout_files);
427 client.add_entity_request_handler(Self::handle_open_commit_message_buffer);
428 client.add_entity_request_handler(Self::handle_set_index_text);
429 client.add_entity_request_handler(Self::handle_askpass);
430 client.add_entity_request_handler(Self::handle_check_for_pushed_commits);
431 client.add_entity_request_handler(Self::handle_git_diff);
432 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
433 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
434 client.add_entity_message_handler(Self::handle_update_diff_bases);
435 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
436 client.add_entity_request_handler(Self::handle_blame_buffer);
437 client.add_entity_message_handler(Self::handle_update_repository);
438 client.add_entity_message_handler(Self::handle_remove_repository);
439 }
440
441 pub fn is_local(&self) -> bool {
442 matches!(self.state, GitStoreState::Local { .. })
443 }
444
445 pub fn shared(&mut self, project_id: u64, client: AnyProtoClient, cx: &mut Context<Self>) {
446 match &mut self.state {
447 GitStoreState::Ssh {
448 downstream: downstream_client,
449 ..
450 } => {
451 for repo in self.repositories.values() {
452 let update = repo.read(cx).snapshot.initial_update(project_id);
453 for update in split_repository_update(update) {
454 client.send(update).log_err();
455 }
456 }
457 *downstream_client = Some((client, ProjectId(project_id)));
458 }
459 GitStoreState::Local {
460 downstream: downstream_client,
461 ..
462 } => {
463 let mut snapshots = HashMap::default();
464 let (updates_tx, mut updates_rx) = mpsc::unbounded();
465 for repo in self.repositories.values() {
466 updates_tx
467 .unbounded_send(DownstreamUpdate::UpdateRepository(
468 repo.read(cx).snapshot.clone(),
469 ))
470 .ok();
471 }
472 *downstream_client = Some(LocalDownstreamState {
473 client: client.clone(),
474 project_id: ProjectId(project_id),
475 updates_tx,
476 _task: cx.spawn(async move |this, cx| {
477 cx.background_spawn(async move {
478 while let Some(update) = updates_rx.next().await {
479 match update {
480 DownstreamUpdate::UpdateRepository(snapshot) => {
481 if let Some(old_snapshot) = snapshots.get_mut(&snapshot.id)
482 {
483 let update =
484 snapshot.build_update(old_snapshot, project_id);
485 *old_snapshot = snapshot;
486 for update in split_repository_update(update) {
487 client.send(update)?;
488 }
489 } else {
490 let update = snapshot.initial_update(project_id);
491 for update in split_repository_update(update) {
492 client.send(update)?;
493 }
494 snapshots.insert(snapshot.id, snapshot);
495 }
496 }
497 DownstreamUpdate::RemoveRepository(id) => {
498 client.send(proto::RemoveRepository {
499 project_id,
500 id: id.to_proto(),
501 })?;
502 }
503 }
504 }
505 anyhow::Ok(())
506 })
507 .await
508 .ok();
509 this.update(cx, |this, _| {
510 if let GitStoreState::Local {
511 downstream: downstream_client,
512 ..
513 } = &mut this.state
514 {
515 downstream_client.take();
516 } else {
517 unreachable!("unshared called on remote store");
518 }
519 })
520 }),
521 });
522 }
523 GitStoreState::Remote { .. } => {
524 debug_panic!("shared called on remote store");
525 }
526 }
527 }
528
529 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
530 match &mut self.state {
531 GitStoreState::Local {
532 downstream: downstream_client,
533 ..
534 } => {
535 downstream_client.take();
536 }
537 GitStoreState::Ssh {
538 downstream: downstream_client,
539 ..
540 } => {
541 downstream_client.take();
542 }
543 GitStoreState::Remote { .. } => {
544 debug_panic!("unshared called on remote store");
545 }
546 }
547 self.shared_diffs.clear();
548 }
549
550 pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) {
551 self.shared_diffs.remove(peer_id);
552 }
553
554 pub fn active_repository(&self) -> Option<Entity<Repository>> {
555 self.active_repo_id
556 .as_ref()
557 .map(|id| self.repositories[&id].clone())
558 }
559
560 pub fn open_unstaged_diff(
561 &mut self,
562 buffer: Entity<Buffer>,
563 cx: &mut Context<Self>,
564 ) -> Task<Result<Entity<BufferDiff>>> {
565 let buffer_id = buffer.read(cx).remote_id();
566 if let Some(diff_state) = self.diffs.get(&buffer_id) {
567 if let Some(unstaged_diff) = diff_state
568 .read(cx)
569 .unstaged_diff
570 .as_ref()
571 .and_then(|weak| weak.upgrade())
572 {
573 if let Some(task) =
574 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
575 {
576 return cx.background_executor().spawn(async move {
577 task.await;
578 Ok(unstaged_diff)
579 });
580 }
581 return Task::ready(Ok(unstaged_diff));
582 }
583 }
584
585 let Some((repo, repo_path)) =
586 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
587 else {
588 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
589 };
590
591 let task = self
592 .loading_diffs
593 .entry((buffer_id, DiffKind::Unstaged))
594 .or_insert_with(|| {
595 let staged_text = repo.update(cx, |repo, cx| {
596 repo.load_staged_text(buffer_id, repo_path, cx)
597 });
598 cx.spawn(async move |this, cx| {
599 Self::open_diff_internal(
600 this,
601 DiffKind::Unstaged,
602 staged_text.await.map(DiffBasesChange::SetIndex),
603 buffer,
604 cx,
605 )
606 .await
607 .map_err(Arc::new)
608 })
609 .shared()
610 })
611 .clone();
612
613 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
614 }
615
616 pub fn open_uncommitted_diff(
617 &mut self,
618 buffer: Entity<Buffer>,
619 cx: &mut Context<Self>,
620 ) -> Task<Result<Entity<BufferDiff>>> {
621 let buffer_id = buffer.read(cx).remote_id();
622
623 if let Some(diff_state) = self.diffs.get(&buffer_id) {
624 if let Some(uncommitted_diff) = diff_state
625 .read(cx)
626 .uncommitted_diff
627 .as_ref()
628 .and_then(|weak| weak.upgrade())
629 {
630 if let Some(task) =
631 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
632 {
633 return cx.background_executor().spawn(async move {
634 task.await;
635 Ok(uncommitted_diff)
636 });
637 }
638 return Task::ready(Ok(uncommitted_diff));
639 }
640 }
641
642 let Some((repo, repo_path)) =
643 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
644 else {
645 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
646 };
647
648 let task = self
649 .loading_diffs
650 .entry((buffer_id, DiffKind::Uncommitted))
651 .or_insert_with(|| {
652 let changes = repo.update(cx, |repo, cx| {
653 repo.load_committed_text(buffer_id, repo_path, cx)
654 });
655
656 cx.spawn(async move |this, cx| {
657 Self::open_diff_internal(this, DiffKind::Uncommitted, changes.await, buffer, cx)
658 .await
659 .map_err(Arc::new)
660 })
661 .shared()
662 })
663 .clone();
664
665 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
666 }
667
668 async fn open_diff_internal(
669 this: WeakEntity<Self>,
670 kind: DiffKind,
671 texts: Result<DiffBasesChange>,
672 buffer_entity: Entity<Buffer>,
673 cx: &mut AsyncApp,
674 ) -> Result<Entity<BufferDiff>> {
675 let diff_bases_change = match texts {
676 Err(e) => {
677 this.update(cx, |this, cx| {
678 let buffer = buffer_entity.read(cx);
679 let buffer_id = buffer.remote_id();
680 this.loading_diffs.remove(&(buffer_id, kind));
681 })?;
682 return Err(e);
683 }
684 Ok(change) => change,
685 };
686
687 this.update(cx, |this, cx| {
688 let buffer = buffer_entity.read(cx);
689 let buffer_id = buffer.remote_id();
690 let language = buffer.language().cloned();
691 let language_registry = buffer.language_registry();
692 let text_snapshot = buffer.text_snapshot();
693 this.loading_diffs.remove(&(buffer_id, kind));
694
695 let git_store = cx.weak_entity();
696 let diff_state = this
697 .diffs
698 .entry(buffer_id)
699 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
700
701 let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
702
703 cx.subscribe(&diff, Self::on_buffer_diff_event).detach();
704 diff_state.update(cx, |diff_state, cx| {
705 diff_state.language = language;
706 diff_state.language_registry = language_registry;
707
708 match kind {
709 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
710 DiffKind::Uncommitted => {
711 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
712 diff
713 } else {
714 let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
715 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
716 unstaged_diff
717 };
718
719 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
720 diff_state.uncommitted_diff = Some(diff.downgrade())
721 }
722 }
723
724 diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
725 let rx = diff_state.wait_for_recalculation();
726
727 anyhow::Ok(async move {
728 if let Some(rx) = rx {
729 rx.await;
730 }
731 Ok(diff)
732 })
733 })
734 })??
735 .await
736 }
737
738 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
739 let diff_state = self.diffs.get(&buffer_id)?;
740 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
741 }
742
743 pub fn get_uncommitted_diff(
744 &self,
745 buffer_id: BufferId,
746 cx: &App,
747 ) -> Option<Entity<BufferDiff>> {
748 let diff_state = self.diffs.get(&buffer_id)?;
749 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
750 }
751
752 pub fn open_conflict_set(
753 &mut self,
754 buffer: Entity<Buffer>,
755 cx: &mut Context<Self>,
756 ) -> Entity<ConflictSet> {
757 log::debug!("open conflict set");
758 let buffer_id = buffer.read(cx).remote_id();
759
760 if let Some(git_state) = self.diffs.get(&buffer_id) {
761 if let Some(conflict_set) = git_state
762 .read(cx)
763 .conflict_set
764 .as_ref()
765 .and_then(|weak| weak.upgrade())
766 {
767 let conflict_set = conflict_set.clone();
768 let buffer_snapshot = buffer.read(cx).text_snapshot();
769
770 git_state.update(cx, |state, cx| {
771 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
772 });
773
774 return conflict_set;
775 }
776 }
777
778 let is_unmerged = self
779 .repository_and_path_for_buffer_id(buffer_id, cx)
780 .map_or(false, |(repo, path)| {
781 repo.read(cx)
782 .snapshot
783 .merge
784 .conflicted_paths
785 .contains(&path)
786 });
787 let git_store = cx.weak_entity();
788 let buffer_git_state = self
789 .diffs
790 .entry(buffer_id)
791 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
792 let conflict_set = cx.new(|cx| ConflictSet::new(buffer_id, is_unmerged, cx));
793
794 self._subscriptions
795 .push(cx.subscribe(&conflict_set, |_, _, _, cx| {
796 cx.emit(GitStoreEvent::ConflictsUpdated);
797 }));
798
799 buffer_git_state.update(cx, |state, cx| {
800 state.conflict_set = Some(conflict_set.downgrade());
801 let buffer_snapshot = buffer.read(cx).text_snapshot();
802 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
803 });
804
805 conflict_set
806 }
807
808 pub fn project_path_git_status(
809 &self,
810 project_path: &ProjectPath,
811 cx: &App,
812 ) -> Option<FileStatus> {
813 let (repo, repo_path) = self.repository_and_path_for_project_path(project_path, cx)?;
814 Some(repo.read(cx).status_for_path(&repo_path)?.status)
815 }
816
817 pub fn checkpoint(&self, cx: &mut App) -> Task<Result<GitStoreCheckpoint>> {
818 let mut work_directory_abs_paths = Vec::new();
819 let mut checkpoints = Vec::new();
820 for repository in self.repositories.values() {
821 repository.update(cx, |repository, _| {
822 work_directory_abs_paths.push(repository.snapshot.work_directory_abs_path.clone());
823 checkpoints.push(repository.checkpoint().map(|checkpoint| checkpoint?));
824 });
825 }
826
827 cx.background_executor().spawn(async move {
828 let checkpoints = future::try_join_all(checkpoints).await?;
829 Ok(GitStoreCheckpoint {
830 checkpoints_by_work_dir_abs_path: work_directory_abs_paths
831 .into_iter()
832 .zip(checkpoints)
833 .collect(),
834 })
835 })
836 }
837
838 pub fn restore_checkpoint(
839 &self,
840 checkpoint: GitStoreCheckpoint,
841 cx: &mut App,
842 ) -> Task<Result<()>> {
843 let repositories_by_work_dir_abs_path = self
844 .repositories
845 .values()
846 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
847 .collect::<HashMap<_, _>>();
848
849 let mut tasks = Vec::new();
850 for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
851 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path) {
852 let restore = repository.update(cx, |repository, _| {
853 repository.restore_checkpoint(checkpoint)
854 });
855 tasks.push(async move { restore.await? });
856 }
857 }
858 cx.background_spawn(async move {
859 future::try_join_all(tasks).await?;
860 Ok(())
861 })
862 }
863
864 /// Compares two checkpoints, returning true if they are equal.
865 pub fn compare_checkpoints(
866 &self,
867 left: GitStoreCheckpoint,
868 mut right: GitStoreCheckpoint,
869 cx: &mut App,
870 ) -> Task<Result<bool>> {
871 let repositories_by_work_dir_abs_path = self
872 .repositories
873 .values()
874 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
875 .collect::<HashMap<_, _>>();
876
877 let mut tasks = Vec::new();
878 for (work_dir_abs_path, left_checkpoint) in left.checkpoints_by_work_dir_abs_path {
879 if let Some(right_checkpoint) = right
880 .checkpoints_by_work_dir_abs_path
881 .remove(&work_dir_abs_path)
882 {
883 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path)
884 {
885 let compare = repository.update(cx, |repository, _| {
886 repository.compare_checkpoints(left_checkpoint, right_checkpoint)
887 });
888
889 tasks.push(async move { compare.await? });
890 }
891 } else {
892 return Task::ready(Ok(false));
893 }
894 }
895 cx.background_spawn(async move {
896 Ok(future::try_join_all(tasks)
897 .await?
898 .into_iter()
899 .all(|result| result))
900 })
901 }
902
903 /// Blames a buffer.
904 pub fn blame_buffer(
905 &self,
906 buffer: &Entity<Buffer>,
907 version: Option<clock::Global>,
908 cx: &mut App,
909 ) -> Task<Result<Option<Blame>>> {
910 let buffer = buffer.read(cx);
911 let Some((repo, repo_path)) =
912 self.repository_and_path_for_buffer_id(buffer.remote_id(), cx)
913 else {
914 return Task::ready(Err(anyhow!("failed to find a git repository for buffer")));
915 };
916 let content = match &version {
917 Some(version) => buffer.rope_for_version(version).clone(),
918 None => buffer.as_rope().clone(),
919 };
920 let version = version.unwrap_or(buffer.version());
921 let buffer_id = buffer.remote_id();
922
923 let rx = repo.update(cx, |repo, _| {
924 repo.send_job(None, move |state, _| async move {
925 match state {
926 RepositoryState::Local { backend, .. } => backend
927 .blame(repo_path.clone(), content)
928 .await
929 .with_context(|| format!("Failed to blame {:?}", repo_path.0))
930 .map(Some),
931 RepositoryState::Remote { project_id, client } => {
932 let response = client
933 .request(proto::BlameBuffer {
934 project_id: project_id.to_proto(),
935 buffer_id: buffer_id.into(),
936 version: serialize_version(&version),
937 })
938 .await?;
939 Ok(deserialize_blame_buffer_response(response))
940 }
941 }
942 })
943 });
944
945 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
946 }
947
948 pub fn get_permalink_to_line(
949 &self,
950 buffer: &Entity<Buffer>,
951 selection: Range<u32>,
952 cx: &mut App,
953 ) -> Task<Result<url::Url>> {
954 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
955 return Task::ready(Err(anyhow!("buffer has no file")));
956 };
957
958 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
959 &(file.worktree.read(cx).id(), file.path.clone()).into(),
960 cx,
961 ) else {
962 // If we're not in a Git repo, check whether this is a Rust source
963 // file in the Cargo registry (presumably opened with go-to-definition
964 // from a normal Rust file). If so, we can put together a permalink
965 // using crate metadata.
966 if buffer
967 .read(cx)
968 .language()
969 .is_none_or(|lang| lang.name() != "Rust".into())
970 {
971 return Task::ready(Err(anyhow!("no permalink available")));
972 }
973 let Some(file_path) = file.worktree.read(cx).absolutize(&file.path).ok() else {
974 return Task::ready(Err(anyhow!("no permalink available")));
975 };
976 return cx.spawn(async move |cx| {
977 let provider_registry = cx.update(GitHostingProviderRegistry::default_global)?;
978 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
979 .map_err(|_| anyhow!("no permalink available"))
980 });
981
982 // TODO remote case
983 };
984
985 let buffer_id = buffer.read(cx).remote_id();
986 let branch = repo.read(cx).branch.clone();
987 let remote = branch
988 .as_ref()
989 .and_then(|b| b.upstream.as_ref())
990 .and_then(|b| b.remote_name())
991 .unwrap_or("origin")
992 .to_string();
993
994 let rx = repo.update(cx, |repo, _| {
995 repo.send_job(None, move |state, cx| async move {
996 match state {
997 RepositoryState::Local { backend, .. } => {
998 let origin_url = backend
999 .remote_url(&remote)
1000 .ok_or_else(|| anyhow!("remote \"{remote}\" not found"))?;
1001
1002 let sha = backend
1003 .head_sha()
1004 .await
1005 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
1006
1007 let provider_registry =
1008 cx.update(GitHostingProviderRegistry::default_global)?;
1009
1010 let (provider, remote) =
1011 parse_git_remote_url(provider_registry, &origin_url)
1012 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
1013
1014 let path = repo_path
1015 .to_str()
1016 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
1017
1018 Ok(provider.build_permalink(
1019 remote,
1020 BuildPermalinkParams {
1021 sha: &sha,
1022 path,
1023 selection: Some(selection),
1024 },
1025 ))
1026 }
1027 RepositoryState::Remote { project_id, client } => {
1028 let response = client
1029 .request(proto::GetPermalinkToLine {
1030 project_id: project_id.to_proto(),
1031 buffer_id: buffer_id.into(),
1032 selection: Some(proto::Range {
1033 start: selection.start as u64,
1034 end: selection.end as u64,
1035 }),
1036 })
1037 .await?;
1038
1039 url::Url::parse(&response.permalink).context("failed to parse permalink")
1040 }
1041 }
1042 })
1043 });
1044 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1045 }
1046
1047 fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> {
1048 match &self.state {
1049 GitStoreState::Local {
1050 downstream: downstream_client,
1051 ..
1052 } => downstream_client
1053 .as_ref()
1054 .map(|state| (state.client.clone(), state.project_id)),
1055 GitStoreState::Ssh {
1056 downstream: downstream_client,
1057 ..
1058 } => downstream_client.clone(),
1059 GitStoreState::Remote { .. } => None,
1060 }
1061 }
1062
1063 fn upstream_client(&self) -> Option<AnyProtoClient> {
1064 match &self.state {
1065 GitStoreState::Local { .. } => None,
1066 GitStoreState::Ssh {
1067 upstream_client, ..
1068 }
1069 | GitStoreState::Remote {
1070 upstream_client, ..
1071 } => Some(upstream_client.clone()),
1072 }
1073 }
1074
1075 fn on_worktree_store_event(
1076 &mut self,
1077 worktree_store: Entity<WorktreeStore>,
1078 event: &WorktreeStoreEvent,
1079 cx: &mut Context<Self>,
1080 ) {
1081 let GitStoreState::Local {
1082 project_environment,
1083 downstream,
1084 next_repository_id,
1085 fs,
1086 } = &self.state
1087 else {
1088 return;
1089 };
1090
1091 match event {
1092 WorktreeStoreEvent::WorktreeUpdatedEntries(worktree_id, updated_entries) => {
1093 let mut paths_by_git_repo = HashMap::<_, Vec<_>>::default();
1094 for (relative_path, _, _) in updated_entries.iter() {
1095 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
1096 &(*worktree_id, relative_path.clone()).into(),
1097 cx,
1098 ) else {
1099 continue;
1100 };
1101 paths_by_git_repo.entry(repo).or_default().push(repo_path)
1102 }
1103
1104 for (repo, paths) in paths_by_git_repo {
1105 repo.update(cx, |repo, cx| {
1106 repo.paths_changed(
1107 paths,
1108 downstream
1109 .as_ref()
1110 .map(|downstream| downstream.updates_tx.clone()),
1111 cx,
1112 );
1113 });
1114 }
1115 }
1116 WorktreeStoreEvent::WorktreeUpdatedGitRepositories(worktree_id, changed_repos) => {
1117 let Some(worktree) = worktree_store.read(cx).worktree_for_id(*worktree_id, cx)
1118 else {
1119 return;
1120 };
1121 if !worktree.read(cx).is_visible() {
1122 log::debug!(
1123 "not adding repositories for local worktree {:?} because it's not visible",
1124 worktree.read(cx).abs_path()
1125 );
1126 return;
1127 }
1128 self.update_repositories_from_worktree(
1129 project_environment.clone(),
1130 next_repository_id.clone(),
1131 downstream
1132 .as_ref()
1133 .map(|downstream| downstream.updates_tx.clone()),
1134 changed_repos.clone(),
1135 fs.clone(),
1136 cx,
1137 );
1138 self.local_worktree_git_repos_changed(worktree, changed_repos, cx);
1139 }
1140 _ => {}
1141 }
1142 }
1143
1144 fn on_repository_event(
1145 &mut self,
1146 repo: Entity<Repository>,
1147 event: &RepositoryEvent,
1148 cx: &mut Context<Self>,
1149 ) {
1150 let id = repo.read(cx).id;
1151 let merge_conflicts = repo.read(cx).snapshot.merge.conflicted_paths.clone();
1152 for (buffer_id, diff) in self.diffs.iter() {
1153 if let Some((buffer_repo, repo_path)) =
1154 self.repository_and_path_for_buffer_id(*buffer_id, cx)
1155 {
1156 if buffer_repo == repo {
1157 diff.update(cx, |diff, cx| {
1158 if let Some(conflict_set) = &diff.conflict_set {
1159 let conflict_status_changed =
1160 conflict_set.update(cx, |conflict_set, cx| {
1161 let has_conflict = merge_conflicts.contains(&repo_path);
1162 conflict_set.set_has_conflict(has_conflict, cx)
1163 })?;
1164 if conflict_status_changed {
1165 let buffer_store = self.buffer_store.read(cx);
1166 if let Some(buffer) = buffer_store.get(*buffer_id) {
1167 let _ = diff.reparse_conflict_markers(
1168 buffer.read(cx).text_snapshot(),
1169 cx,
1170 );
1171 }
1172 }
1173 }
1174 anyhow::Ok(())
1175 })
1176 .ok();
1177 }
1178 }
1179 }
1180 cx.emit(GitStoreEvent::RepositoryUpdated(
1181 id,
1182 event.clone(),
1183 self.active_repo_id == Some(id),
1184 ))
1185 }
1186
1187 fn on_jobs_updated(&mut self, _: Entity<Repository>, _: &JobsUpdated, cx: &mut Context<Self>) {
1188 cx.emit(GitStoreEvent::JobsUpdated)
1189 }
1190
1191 /// Update our list of repositories and schedule git scans in response to a notification from a worktree,
1192 fn update_repositories_from_worktree(
1193 &mut self,
1194 project_environment: Entity<ProjectEnvironment>,
1195 next_repository_id: Arc<AtomicU64>,
1196 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
1197 updated_git_repositories: UpdatedGitRepositoriesSet,
1198 fs: Arc<dyn Fs>,
1199 cx: &mut Context<Self>,
1200 ) {
1201 let mut removed_ids = Vec::new();
1202 for update in updated_git_repositories.iter() {
1203 if let Some((id, existing)) = self.repositories.iter().find(|(_, repo)| {
1204 let existing_work_directory_abs_path =
1205 repo.read(cx).work_directory_abs_path.clone();
1206 Some(&existing_work_directory_abs_path)
1207 == update.old_work_directory_abs_path.as_ref()
1208 || Some(&existing_work_directory_abs_path)
1209 == update.new_work_directory_abs_path.as_ref()
1210 }) {
1211 if let Some(new_work_directory_abs_path) =
1212 update.new_work_directory_abs_path.clone()
1213 {
1214 existing.update(cx, |existing, cx| {
1215 existing.snapshot.work_directory_abs_path = new_work_directory_abs_path;
1216 existing.schedule_scan(updates_tx.clone(), cx);
1217 });
1218 } else {
1219 removed_ids.push(*id);
1220 }
1221 } else if let UpdatedGitRepository {
1222 new_work_directory_abs_path: Some(work_directory_abs_path),
1223 dot_git_abs_path: Some(dot_git_abs_path),
1224 repository_dir_abs_path: Some(repository_dir_abs_path),
1225 common_dir_abs_path: Some(common_dir_abs_path),
1226 ..
1227 } = update
1228 {
1229 let id = RepositoryId(next_repository_id.fetch_add(1, atomic::Ordering::Release));
1230 let git_store = cx.weak_entity();
1231 let repo = cx.new(|cx| {
1232 let mut repo = Repository::local(
1233 id,
1234 work_directory_abs_path.clone(),
1235 dot_git_abs_path.clone(),
1236 repository_dir_abs_path.clone(),
1237 common_dir_abs_path.clone(),
1238 project_environment.downgrade(),
1239 fs.clone(),
1240 git_store,
1241 cx,
1242 );
1243 repo.schedule_scan(updates_tx.clone(), cx);
1244 repo
1245 });
1246 self._subscriptions
1247 .push(cx.subscribe(&repo, Self::on_repository_event));
1248 self._subscriptions
1249 .push(cx.subscribe(&repo, Self::on_jobs_updated));
1250 self.repositories.insert(id, repo);
1251 cx.emit(GitStoreEvent::RepositoryAdded(id));
1252 self.active_repo_id.get_or_insert_with(|| {
1253 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1254 id
1255 });
1256 }
1257 }
1258
1259 for id in removed_ids {
1260 if self.active_repo_id == Some(id) {
1261 self.active_repo_id = None;
1262 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1263 }
1264 self.repositories.remove(&id);
1265 if let Some(updates_tx) = updates_tx.as_ref() {
1266 updates_tx
1267 .unbounded_send(DownstreamUpdate::RemoveRepository(id))
1268 .ok();
1269 }
1270 }
1271 }
1272
1273 fn on_buffer_store_event(
1274 &mut self,
1275 _: Entity<BufferStore>,
1276 event: &BufferStoreEvent,
1277 cx: &mut Context<Self>,
1278 ) {
1279 match event {
1280 BufferStoreEvent::BufferAdded(buffer) => {
1281 cx.subscribe(&buffer, |this, buffer, event, cx| {
1282 if let BufferEvent::LanguageChanged = event {
1283 let buffer_id = buffer.read(cx).remote_id();
1284 if let Some(diff_state) = this.diffs.get(&buffer_id) {
1285 diff_state.update(cx, |diff_state, cx| {
1286 diff_state.buffer_language_changed(buffer, cx);
1287 });
1288 }
1289 }
1290 })
1291 .detach();
1292 }
1293 BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => {
1294 if let Some(diffs) = self.shared_diffs.get_mut(peer_id) {
1295 diffs.remove(buffer_id);
1296 }
1297 }
1298 BufferStoreEvent::BufferDropped(buffer_id) => {
1299 self.diffs.remove(&buffer_id);
1300 for diffs in self.shared_diffs.values_mut() {
1301 diffs.remove(buffer_id);
1302 }
1303 }
1304
1305 _ => {}
1306 }
1307 }
1308
1309 pub fn recalculate_buffer_diffs(
1310 &mut self,
1311 buffers: Vec<Entity<Buffer>>,
1312 cx: &mut Context<Self>,
1313 ) -> impl Future<Output = ()> + use<> {
1314 let mut futures = Vec::new();
1315 for buffer in buffers {
1316 if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
1317 let buffer = buffer.read(cx).text_snapshot();
1318 diff_state.update(cx, |diff_state, cx| {
1319 diff_state.recalculate_diffs(buffer.clone(), cx);
1320 futures.extend(diff_state.wait_for_recalculation().map(FutureExt::boxed));
1321 });
1322 futures.push(diff_state.update(cx, |diff_state, cx| {
1323 diff_state
1324 .reparse_conflict_markers(buffer, cx)
1325 .map(|_| {})
1326 .boxed()
1327 }));
1328 }
1329 }
1330 async move {
1331 futures::future::join_all(futures).await;
1332 }
1333 }
1334
1335 fn on_buffer_diff_event(
1336 &mut self,
1337 diff: Entity<buffer_diff::BufferDiff>,
1338 event: &BufferDiffEvent,
1339 cx: &mut Context<Self>,
1340 ) {
1341 if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
1342 let buffer_id = diff.read(cx).buffer_id;
1343 if let Some(diff_state) = self.diffs.get(&buffer_id) {
1344 let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
1345 diff_state.hunk_staging_operation_count += 1;
1346 diff_state.hunk_staging_operation_count
1347 });
1348 if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
1349 let recv = repo.update(cx, |repo, cx| {
1350 log::debug!("hunks changed for {}", path.display());
1351 repo.spawn_set_index_text_job(
1352 path,
1353 new_index_text.as_ref().map(|rope| rope.to_string()),
1354 Some(hunk_staging_operation_count),
1355 cx,
1356 )
1357 });
1358 let diff = diff.downgrade();
1359 cx.spawn(async move |this, cx| {
1360 if let Ok(Err(error)) = cx.background_spawn(recv).await {
1361 diff.update(cx, |diff, cx| {
1362 diff.clear_pending_hunks(cx);
1363 })
1364 .ok();
1365 this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
1366 .ok();
1367 }
1368 })
1369 .detach();
1370 }
1371 }
1372 }
1373 }
1374
1375 fn local_worktree_git_repos_changed(
1376 &mut self,
1377 worktree: Entity<Worktree>,
1378 changed_repos: &UpdatedGitRepositoriesSet,
1379 cx: &mut Context<Self>,
1380 ) {
1381 log::debug!("local worktree repos changed");
1382 debug_assert!(worktree.read(cx).is_local());
1383
1384 for repository in self.repositories.values() {
1385 repository.update(cx, |repository, cx| {
1386 let repo_abs_path = &repository.work_directory_abs_path;
1387 if changed_repos.iter().any(|update| {
1388 update.old_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1389 || update.new_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1390 }) {
1391 repository.reload_buffer_diff_bases(cx);
1392 }
1393 });
1394 }
1395 }
1396
1397 pub fn repositories(&self) -> &HashMap<RepositoryId, Entity<Repository>> {
1398 &self.repositories
1399 }
1400
1401 pub fn status_for_buffer_id(&self, buffer_id: BufferId, cx: &App) -> Option<FileStatus> {
1402 let (repo, path) = self.repository_and_path_for_buffer_id(buffer_id, cx)?;
1403 let status = repo.read(cx).snapshot.status_for_path(&path)?;
1404 Some(status.status)
1405 }
1406
1407 pub fn repository_and_path_for_buffer_id(
1408 &self,
1409 buffer_id: BufferId,
1410 cx: &App,
1411 ) -> Option<(Entity<Repository>, RepoPath)> {
1412 let buffer = self.buffer_store.read(cx).get(buffer_id)?;
1413 let project_path = buffer.read(cx).project_path(cx)?;
1414 self.repository_and_path_for_project_path(&project_path, cx)
1415 }
1416
1417 pub fn repository_and_path_for_project_path(
1418 &self,
1419 path: &ProjectPath,
1420 cx: &App,
1421 ) -> Option<(Entity<Repository>, RepoPath)> {
1422 let abs_path = self.worktree_store.read(cx).absolutize(path, cx)?;
1423 self.repositories
1424 .values()
1425 .filter_map(|repo| {
1426 let repo_path = repo.read(cx).abs_path_to_repo_path(&abs_path)?;
1427 Some((repo.clone(), repo_path))
1428 })
1429 .max_by_key(|(repo, _)| repo.read(cx).work_directory_abs_path.clone())
1430 }
1431
1432 pub fn git_init(
1433 &self,
1434 path: Arc<Path>,
1435 fallback_branch_name: String,
1436 cx: &App,
1437 ) -> Task<Result<()>> {
1438 match &self.state {
1439 GitStoreState::Local { fs, .. } => {
1440 let fs = fs.clone();
1441 cx.background_executor()
1442 .spawn(async move { fs.git_init(&path, fallback_branch_name) })
1443 }
1444 GitStoreState::Ssh {
1445 upstream_client,
1446 upstream_project_id: project_id,
1447 ..
1448 }
1449 | GitStoreState::Remote {
1450 upstream_client,
1451 upstream_project_id: project_id,
1452 ..
1453 } => {
1454 let client = upstream_client.clone();
1455 let project_id = *project_id;
1456 cx.background_executor().spawn(async move {
1457 client
1458 .request(proto::GitInit {
1459 project_id: project_id.0,
1460 abs_path: path.to_string_lossy().to_string(),
1461 fallback_branch_name,
1462 })
1463 .await?;
1464 Ok(())
1465 })
1466 }
1467 }
1468 }
1469
1470 async fn handle_update_repository(
1471 this: Entity<Self>,
1472 envelope: TypedEnvelope<proto::UpdateRepository>,
1473 mut cx: AsyncApp,
1474 ) -> Result<()> {
1475 this.update(&mut cx, |this, cx| {
1476 let mut update = envelope.payload;
1477
1478 let id = RepositoryId::from_proto(update.id);
1479 let client = this
1480 .upstream_client()
1481 .context("no upstream client")?
1482 .clone();
1483
1484 let mut is_new = false;
1485 let repo = this.repositories.entry(id).or_insert_with(|| {
1486 is_new = true;
1487 let git_store = cx.weak_entity();
1488 cx.new(|cx| {
1489 Repository::remote(
1490 id,
1491 Path::new(&update.abs_path).into(),
1492 ProjectId(update.project_id),
1493 client,
1494 git_store,
1495 cx,
1496 )
1497 })
1498 });
1499 if is_new {
1500 this._subscriptions
1501 .push(cx.subscribe(&repo, Self::on_repository_event))
1502 }
1503
1504 repo.update(cx, {
1505 let update = update.clone();
1506 |repo, cx| repo.apply_remote_update(update, cx)
1507 })?;
1508
1509 this.active_repo_id.get_or_insert_with(|| {
1510 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1511 id
1512 });
1513
1514 if let Some((client, project_id)) = this.downstream_client() {
1515 update.project_id = project_id.to_proto();
1516 client.send(update).log_err();
1517 }
1518 Ok(())
1519 })?
1520 }
1521
1522 async fn handle_remove_repository(
1523 this: Entity<Self>,
1524 envelope: TypedEnvelope<proto::RemoveRepository>,
1525 mut cx: AsyncApp,
1526 ) -> Result<()> {
1527 this.update(&mut cx, |this, cx| {
1528 let mut update = envelope.payload;
1529 let id = RepositoryId::from_proto(update.id);
1530 this.repositories.remove(&id);
1531 if let Some((client, project_id)) = this.downstream_client() {
1532 update.project_id = project_id.to_proto();
1533 client.send(update).log_err();
1534 }
1535 if this.active_repo_id == Some(id) {
1536 this.active_repo_id = None;
1537 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1538 }
1539 cx.emit(GitStoreEvent::RepositoryRemoved(id));
1540 })
1541 }
1542
1543 async fn handle_git_init(
1544 this: Entity<Self>,
1545 envelope: TypedEnvelope<proto::GitInit>,
1546 cx: AsyncApp,
1547 ) -> Result<proto::Ack> {
1548 let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1549 let name = envelope.payload.fallback_branch_name;
1550 cx.update(|cx| this.read(cx).git_init(path, name, cx))?
1551 .await?;
1552
1553 Ok(proto::Ack {})
1554 }
1555
1556 async fn handle_fetch(
1557 this: Entity<Self>,
1558 envelope: TypedEnvelope<proto::Fetch>,
1559 mut cx: AsyncApp,
1560 ) -> Result<proto::RemoteMessageResponse> {
1561 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1562 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1563 let askpass_id = envelope.payload.askpass_id;
1564
1565 let askpass = make_remote_delegate(
1566 this,
1567 envelope.payload.project_id,
1568 repository_id,
1569 askpass_id,
1570 &mut cx,
1571 );
1572
1573 let remote_output = repository_handle
1574 .update(&mut cx, |repository_handle, cx| {
1575 repository_handle.fetch(askpass, cx)
1576 })?
1577 .await??;
1578
1579 Ok(proto::RemoteMessageResponse {
1580 stdout: remote_output.stdout,
1581 stderr: remote_output.stderr,
1582 })
1583 }
1584
1585 async fn handle_push(
1586 this: Entity<Self>,
1587 envelope: TypedEnvelope<proto::Push>,
1588 mut cx: AsyncApp,
1589 ) -> Result<proto::RemoteMessageResponse> {
1590 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1591 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1592
1593 let askpass_id = envelope.payload.askpass_id;
1594 let askpass = make_remote_delegate(
1595 this,
1596 envelope.payload.project_id,
1597 repository_id,
1598 askpass_id,
1599 &mut cx,
1600 );
1601
1602 let options = envelope
1603 .payload
1604 .options
1605 .as_ref()
1606 .map(|_| match envelope.payload.options() {
1607 proto::push::PushOptions::SetUpstream => git::repository::PushOptions::SetUpstream,
1608 proto::push::PushOptions::Force => git::repository::PushOptions::Force,
1609 });
1610
1611 let branch_name = envelope.payload.branch_name.into();
1612 let remote_name = envelope.payload.remote_name.into();
1613
1614 let remote_output = repository_handle
1615 .update(&mut cx, |repository_handle, cx| {
1616 repository_handle.push(branch_name, remote_name, options, askpass, cx)
1617 })?
1618 .await??;
1619 Ok(proto::RemoteMessageResponse {
1620 stdout: remote_output.stdout,
1621 stderr: remote_output.stderr,
1622 })
1623 }
1624
1625 async fn handle_pull(
1626 this: Entity<Self>,
1627 envelope: TypedEnvelope<proto::Pull>,
1628 mut cx: AsyncApp,
1629 ) -> Result<proto::RemoteMessageResponse> {
1630 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1631 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1632 let askpass_id = envelope.payload.askpass_id;
1633 let askpass = make_remote_delegate(
1634 this,
1635 envelope.payload.project_id,
1636 repository_id,
1637 askpass_id,
1638 &mut cx,
1639 );
1640
1641 let branch_name = envelope.payload.branch_name.into();
1642 let remote_name = envelope.payload.remote_name.into();
1643
1644 let remote_message = repository_handle
1645 .update(&mut cx, |repository_handle, cx| {
1646 repository_handle.pull(branch_name, remote_name, askpass, cx)
1647 })?
1648 .await??;
1649
1650 Ok(proto::RemoteMessageResponse {
1651 stdout: remote_message.stdout,
1652 stderr: remote_message.stderr,
1653 })
1654 }
1655
1656 async fn handle_stage(
1657 this: Entity<Self>,
1658 envelope: TypedEnvelope<proto::Stage>,
1659 mut cx: AsyncApp,
1660 ) -> Result<proto::Ack> {
1661 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1662 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1663
1664 let entries = envelope
1665 .payload
1666 .paths
1667 .into_iter()
1668 .map(PathBuf::from)
1669 .map(RepoPath::new)
1670 .collect();
1671
1672 repository_handle
1673 .update(&mut cx, |repository_handle, cx| {
1674 repository_handle.stage_entries(entries, cx)
1675 })?
1676 .await?;
1677 Ok(proto::Ack {})
1678 }
1679
1680 async fn handle_unstage(
1681 this: Entity<Self>,
1682 envelope: TypedEnvelope<proto::Unstage>,
1683 mut cx: AsyncApp,
1684 ) -> Result<proto::Ack> {
1685 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1686 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1687
1688 let entries = envelope
1689 .payload
1690 .paths
1691 .into_iter()
1692 .map(PathBuf::from)
1693 .map(RepoPath::new)
1694 .collect();
1695
1696 repository_handle
1697 .update(&mut cx, |repository_handle, cx| {
1698 repository_handle.unstage_entries(entries, cx)
1699 })?
1700 .await?;
1701
1702 Ok(proto::Ack {})
1703 }
1704
1705 async fn handle_set_index_text(
1706 this: Entity<Self>,
1707 envelope: TypedEnvelope<proto::SetIndexText>,
1708 mut cx: AsyncApp,
1709 ) -> Result<proto::Ack> {
1710 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1711 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1712 let repo_path = RepoPath::from_str(&envelope.payload.path);
1713
1714 repository_handle
1715 .update(&mut cx, |repository_handle, cx| {
1716 repository_handle.spawn_set_index_text_job(
1717 repo_path,
1718 envelope.payload.text,
1719 None,
1720 cx,
1721 )
1722 })?
1723 .await??;
1724 Ok(proto::Ack {})
1725 }
1726
1727 async fn handle_commit(
1728 this: Entity<Self>,
1729 envelope: TypedEnvelope<proto::Commit>,
1730 mut cx: AsyncApp,
1731 ) -> Result<proto::Ack> {
1732 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1733 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1734
1735 let message = SharedString::from(envelope.payload.message);
1736 let name = envelope.payload.name.map(SharedString::from);
1737 let email = envelope.payload.email.map(SharedString::from);
1738 let options = envelope.payload.options.unwrap_or_default();
1739
1740 repository_handle
1741 .update(&mut cx, |repository_handle, cx| {
1742 repository_handle.commit(
1743 message,
1744 name.zip(email),
1745 CommitOptions {
1746 amend: options.amend,
1747 },
1748 cx,
1749 )
1750 })?
1751 .await??;
1752 Ok(proto::Ack {})
1753 }
1754
1755 async fn handle_get_remotes(
1756 this: Entity<Self>,
1757 envelope: TypedEnvelope<proto::GetRemotes>,
1758 mut cx: AsyncApp,
1759 ) -> Result<proto::GetRemotesResponse> {
1760 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1761 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1762
1763 let branch_name = envelope.payload.branch_name;
1764
1765 let remotes = repository_handle
1766 .update(&mut cx, |repository_handle, _| {
1767 repository_handle.get_remotes(branch_name)
1768 })?
1769 .await??;
1770
1771 Ok(proto::GetRemotesResponse {
1772 remotes: remotes
1773 .into_iter()
1774 .map(|remotes| proto::get_remotes_response::Remote {
1775 name: remotes.name.to_string(),
1776 })
1777 .collect::<Vec<_>>(),
1778 })
1779 }
1780
1781 async fn handle_get_branches(
1782 this: Entity<Self>,
1783 envelope: TypedEnvelope<proto::GitGetBranches>,
1784 mut cx: AsyncApp,
1785 ) -> Result<proto::GitBranchesResponse> {
1786 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1787 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1788
1789 let branches = repository_handle
1790 .update(&mut cx, |repository_handle, _| repository_handle.branches())?
1791 .await??;
1792
1793 Ok(proto::GitBranchesResponse {
1794 branches: branches
1795 .into_iter()
1796 .map(|branch| branch_to_proto(&branch))
1797 .collect::<Vec<_>>(),
1798 })
1799 }
1800 async fn handle_create_branch(
1801 this: Entity<Self>,
1802 envelope: TypedEnvelope<proto::GitCreateBranch>,
1803 mut cx: AsyncApp,
1804 ) -> Result<proto::Ack> {
1805 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1806 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1807 let branch_name = envelope.payload.branch_name;
1808
1809 repository_handle
1810 .update(&mut cx, |repository_handle, _| {
1811 repository_handle.create_branch(branch_name)
1812 })?
1813 .await??;
1814
1815 Ok(proto::Ack {})
1816 }
1817
1818 async fn handle_change_branch(
1819 this: Entity<Self>,
1820 envelope: TypedEnvelope<proto::GitChangeBranch>,
1821 mut cx: AsyncApp,
1822 ) -> Result<proto::Ack> {
1823 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1824 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1825 let branch_name = envelope.payload.branch_name;
1826
1827 repository_handle
1828 .update(&mut cx, |repository_handle, _| {
1829 repository_handle.change_branch(branch_name)
1830 })?
1831 .await??;
1832
1833 Ok(proto::Ack {})
1834 }
1835
1836 async fn handle_show(
1837 this: Entity<Self>,
1838 envelope: TypedEnvelope<proto::GitShow>,
1839 mut cx: AsyncApp,
1840 ) -> Result<proto::GitCommitDetails> {
1841 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1842 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1843
1844 let commit = repository_handle
1845 .update(&mut cx, |repository_handle, _| {
1846 repository_handle.show(envelope.payload.commit)
1847 })?
1848 .await??;
1849 Ok(proto::GitCommitDetails {
1850 sha: commit.sha.into(),
1851 message: commit.message.into(),
1852 commit_timestamp: commit.commit_timestamp,
1853 author_email: commit.author_email.into(),
1854 author_name: commit.author_name.into(),
1855 })
1856 }
1857
1858 async fn handle_load_commit_diff(
1859 this: Entity<Self>,
1860 envelope: TypedEnvelope<proto::LoadCommitDiff>,
1861 mut cx: AsyncApp,
1862 ) -> Result<proto::LoadCommitDiffResponse> {
1863 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1864 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1865
1866 let commit_diff = repository_handle
1867 .update(&mut cx, |repository_handle, _| {
1868 repository_handle.load_commit_diff(envelope.payload.commit)
1869 })?
1870 .await??;
1871 Ok(proto::LoadCommitDiffResponse {
1872 files: commit_diff
1873 .files
1874 .into_iter()
1875 .map(|file| proto::CommitFile {
1876 path: file.path.to_string(),
1877 old_text: file.old_text,
1878 new_text: file.new_text,
1879 })
1880 .collect(),
1881 })
1882 }
1883
1884 async fn handle_reset(
1885 this: Entity<Self>,
1886 envelope: TypedEnvelope<proto::GitReset>,
1887 mut cx: AsyncApp,
1888 ) -> Result<proto::Ack> {
1889 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1890 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1891
1892 let mode = match envelope.payload.mode() {
1893 git_reset::ResetMode::Soft => ResetMode::Soft,
1894 git_reset::ResetMode::Mixed => ResetMode::Mixed,
1895 };
1896
1897 repository_handle
1898 .update(&mut cx, |repository_handle, cx| {
1899 repository_handle.reset(envelope.payload.commit, mode, cx)
1900 })?
1901 .await??;
1902 Ok(proto::Ack {})
1903 }
1904
1905 async fn handle_checkout_files(
1906 this: Entity<Self>,
1907 envelope: TypedEnvelope<proto::GitCheckoutFiles>,
1908 mut cx: AsyncApp,
1909 ) -> Result<proto::Ack> {
1910 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1911 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1912 let paths = envelope
1913 .payload
1914 .paths
1915 .iter()
1916 .map(|s| RepoPath::from_str(s))
1917 .collect();
1918
1919 repository_handle
1920 .update(&mut cx, |repository_handle, cx| {
1921 repository_handle.checkout_files(&envelope.payload.commit, paths, cx)
1922 })?
1923 .await??;
1924 Ok(proto::Ack {})
1925 }
1926
1927 async fn handle_open_commit_message_buffer(
1928 this: Entity<Self>,
1929 envelope: TypedEnvelope<proto::OpenCommitMessageBuffer>,
1930 mut cx: AsyncApp,
1931 ) -> Result<proto::OpenBufferResponse> {
1932 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1933 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1934 let buffer = repository
1935 .update(&mut cx, |repository, cx| {
1936 repository.open_commit_buffer(None, this.read(cx).buffer_store.clone(), cx)
1937 })?
1938 .await?;
1939
1940 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id())?;
1941 this.update(&mut cx, |this, cx| {
1942 this.buffer_store.update(cx, |buffer_store, cx| {
1943 buffer_store
1944 .create_buffer_for_peer(
1945 &buffer,
1946 envelope.original_sender_id.unwrap_or(envelope.sender_id),
1947 cx,
1948 )
1949 .detach_and_log_err(cx);
1950 })
1951 })?;
1952
1953 Ok(proto::OpenBufferResponse {
1954 buffer_id: buffer_id.to_proto(),
1955 })
1956 }
1957
1958 async fn handle_askpass(
1959 this: Entity<Self>,
1960 envelope: TypedEnvelope<proto::AskPassRequest>,
1961 mut cx: AsyncApp,
1962 ) -> Result<proto::AskPassResponse> {
1963 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1964 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1965
1966 let delegates = cx.update(|cx| repository.read(cx).askpass_delegates.clone())?;
1967 let Some(mut askpass) = delegates.lock().remove(&envelope.payload.askpass_id) else {
1968 debug_panic!("no askpass found");
1969 return Err(anyhow::anyhow!("no askpass found"));
1970 };
1971
1972 let response = askpass.ask_password(envelope.payload.prompt).await?;
1973
1974 delegates
1975 .lock()
1976 .insert(envelope.payload.askpass_id, askpass);
1977
1978 Ok(proto::AskPassResponse { response })
1979 }
1980
1981 async fn handle_check_for_pushed_commits(
1982 this: Entity<Self>,
1983 envelope: TypedEnvelope<proto::CheckForPushedCommits>,
1984 mut cx: AsyncApp,
1985 ) -> Result<proto::CheckForPushedCommitsResponse> {
1986 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1987 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1988
1989 let branches = repository_handle
1990 .update(&mut cx, |repository_handle, _| {
1991 repository_handle.check_for_pushed_commits()
1992 })?
1993 .await??;
1994 Ok(proto::CheckForPushedCommitsResponse {
1995 pushed_to: branches
1996 .into_iter()
1997 .map(|commit| commit.to_string())
1998 .collect(),
1999 })
2000 }
2001
2002 async fn handle_git_diff(
2003 this: Entity<Self>,
2004 envelope: TypedEnvelope<proto::GitDiff>,
2005 mut cx: AsyncApp,
2006 ) -> Result<proto::GitDiffResponse> {
2007 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2008 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2009 let diff_type = match envelope.payload.diff_type() {
2010 proto::git_diff::DiffType::HeadToIndex => DiffType::HeadToIndex,
2011 proto::git_diff::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
2012 };
2013
2014 let mut diff = repository_handle
2015 .update(&mut cx, |repository_handle, cx| {
2016 repository_handle.diff(diff_type, cx)
2017 })?
2018 .await??;
2019 const ONE_MB: usize = 1_000_000;
2020 if diff.len() > ONE_MB {
2021 diff = diff.chars().take(ONE_MB).collect()
2022 }
2023
2024 Ok(proto::GitDiffResponse { diff })
2025 }
2026
2027 async fn handle_open_unstaged_diff(
2028 this: Entity<Self>,
2029 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2030 mut cx: AsyncApp,
2031 ) -> Result<proto::OpenUnstagedDiffResponse> {
2032 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2033 let diff = this
2034 .update(&mut cx, |this, cx| {
2035 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2036 Some(this.open_unstaged_diff(buffer, cx))
2037 })?
2038 .ok_or_else(|| anyhow!("no such buffer"))?
2039 .await?;
2040 this.update(&mut cx, |this, _| {
2041 let shared_diffs = this
2042 .shared_diffs
2043 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2044 .or_default();
2045 shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone());
2046 })?;
2047 let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?;
2048 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2049 }
2050
2051 async fn handle_open_uncommitted_diff(
2052 this: Entity<Self>,
2053 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2054 mut cx: AsyncApp,
2055 ) -> Result<proto::OpenUncommittedDiffResponse> {
2056 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2057 let diff = this
2058 .update(&mut cx, |this, cx| {
2059 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2060 Some(this.open_uncommitted_diff(buffer, cx))
2061 })?
2062 .ok_or_else(|| anyhow!("no such buffer"))?
2063 .await?;
2064 this.update(&mut cx, |this, _| {
2065 let shared_diffs = this
2066 .shared_diffs
2067 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2068 .or_default();
2069 shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone());
2070 })?;
2071 diff.read_with(&cx, |diff, cx| {
2072 use proto::open_uncommitted_diff_response::Mode;
2073
2074 let unstaged_diff = diff.secondary_diff();
2075 let index_snapshot = unstaged_diff.and_then(|diff| {
2076 let diff = diff.read(cx);
2077 diff.base_text_exists().then(|| diff.base_text())
2078 });
2079
2080 let mode;
2081 let staged_text;
2082 let committed_text;
2083 if diff.base_text_exists() {
2084 let committed_snapshot = diff.base_text();
2085 committed_text = Some(committed_snapshot.text());
2086 if let Some(index_text) = index_snapshot {
2087 if index_text.remote_id() == committed_snapshot.remote_id() {
2088 mode = Mode::IndexMatchesHead;
2089 staged_text = None;
2090 } else {
2091 mode = Mode::IndexAndHead;
2092 staged_text = Some(index_text.text());
2093 }
2094 } else {
2095 mode = Mode::IndexAndHead;
2096 staged_text = None;
2097 }
2098 } else {
2099 mode = Mode::IndexAndHead;
2100 committed_text = None;
2101 staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2102 }
2103
2104 proto::OpenUncommittedDiffResponse {
2105 committed_text,
2106 staged_text,
2107 mode: mode.into(),
2108 }
2109 })
2110 }
2111
2112 async fn handle_update_diff_bases(
2113 this: Entity<Self>,
2114 request: TypedEnvelope<proto::UpdateDiffBases>,
2115 mut cx: AsyncApp,
2116 ) -> Result<()> {
2117 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2118 this.update(&mut cx, |this, cx| {
2119 if let Some(diff_state) = this.diffs.get_mut(&buffer_id) {
2120 if let Some(buffer) = this.buffer_store.read(cx).get(buffer_id) {
2121 let buffer = buffer.read(cx).text_snapshot();
2122 diff_state.update(cx, |diff_state, cx| {
2123 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2124 })
2125 }
2126 }
2127 })
2128 }
2129
2130 async fn handle_blame_buffer(
2131 this: Entity<Self>,
2132 envelope: TypedEnvelope<proto::BlameBuffer>,
2133 mut cx: AsyncApp,
2134 ) -> Result<proto::BlameBufferResponse> {
2135 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2136 let version = deserialize_version(&envelope.payload.version);
2137 let buffer = this.read_with(&cx, |this, cx| {
2138 this.buffer_store.read(cx).get_existing(buffer_id)
2139 })??;
2140 buffer
2141 .update(&mut cx, |buffer, _| {
2142 buffer.wait_for_version(version.clone())
2143 })?
2144 .await?;
2145 let blame = this
2146 .update(&mut cx, |this, cx| {
2147 this.blame_buffer(&buffer, Some(version), cx)
2148 })?
2149 .await?;
2150 Ok(serialize_blame_buffer_response(blame))
2151 }
2152
2153 async fn handle_get_permalink_to_line(
2154 this: Entity<Self>,
2155 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2156 mut cx: AsyncApp,
2157 ) -> Result<proto::GetPermalinkToLineResponse> {
2158 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2159 // let version = deserialize_version(&envelope.payload.version);
2160 let selection = {
2161 let proto_selection = envelope
2162 .payload
2163 .selection
2164 .context("no selection to get permalink for defined")?;
2165 proto_selection.start as u32..proto_selection.end as u32
2166 };
2167 let buffer = this.read_with(&cx, |this, cx| {
2168 this.buffer_store.read(cx).get_existing(buffer_id)
2169 })??;
2170 let permalink = this
2171 .update(&mut cx, |this, cx| {
2172 this.get_permalink_to_line(&buffer, selection, cx)
2173 })?
2174 .await?;
2175 Ok(proto::GetPermalinkToLineResponse {
2176 permalink: permalink.to_string(),
2177 })
2178 }
2179
2180 fn repository_for_request(
2181 this: &Entity<Self>,
2182 id: RepositoryId,
2183 cx: &mut AsyncApp,
2184 ) -> Result<Entity<Repository>> {
2185 this.update(cx, |this, _| {
2186 this.repositories
2187 .get(&id)
2188 .context("missing repository handle")
2189 .cloned()
2190 })?
2191 }
2192
2193 pub fn repo_snapshots(&self, cx: &App) -> HashMap<RepositoryId, RepositorySnapshot> {
2194 self.repositories
2195 .iter()
2196 .map(|(id, repo)| (*id, repo.read(cx).snapshot.clone()))
2197 .collect()
2198 }
2199}
2200
2201impl BufferGitState {
2202 fn new(_git_store: WeakEntity<GitStore>) -> Self {
2203 Self {
2204 unstaged_diff: Default::default(),
2205 uncommitted_diff: Default::default(),
2206 recalculate_diff_task: Default::default(),
2207 language: Default::default(),
2208 language_registry: Default::default(),
2209 recalculating_tx: postage::watch::channel_with(false).0,
2210 hunk_staging_operation_count: 0,
2211 hunk_staging_operation_count_as_of_write: 0,
2212 head_text: Default::default(),
2213 index_text: Default::default(),
2214 head_changed: Default::default(),
2215 index_changed: Default::default(),
2216 language_changed: Default::default(),
2217 conflict_updated_futures: Default::default(),
2218 conflict_set: Default::default(),
2219 reparse_conflict_markers_task: Default::default(),
2220 }
2221 }
2222
2223 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
2224 self.language = buffer.read(cx).language().cloned();
2225 self.language_changed = true;
2226 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
2227 }
2228
2229 fn reparse_conflict_markers(
2230 &mut self,
2231 buffer: text::BufferSnapshot,
2232 cx: &mut Context<Self>,
2233 ) -> oneshot::Receiver<()> {
2234 let (tx, rx) = oneshot::channel();
2235
2236 let Some(conflict_set) = self
2237 .conflict_set
2238 .as_ref()
2239 .and_then(|conflict_set| conflict_set.upgrade())
2240 else {
2241 return rx;
2242 };
2243
2244 let old_snapshot = conflict_set.read_with(cx, |conflict_set, _| {
2245 if conflict_set.has_conflict {
2246 Some(conflict_set.snapshot())
2247 } else {
2248 None
2249 }
2250 });
2251
2252 if let Some(old_snapshot) = old_snapshot {
2253 self.conflict_updated_futures.push(tx);
2254 self.reparse_conflict_markers_task = Some(cx.spawn(async move |this, cx| {
2255 let (snapshot, changed_range) = cx
2256 .background_spawn(async move {
2257 let new_snapshot = ConflictSet::parse(&buffer);
2258 let changed_range = old_snapshot.compare(&new_snapshot, &buffer);
2259 (new_snapshot, changed_range)
2260 })
2261 .await;
2262 this.update(cx, |this, cx| {
2263 if let Some(conflict_set) = &this.conflict_set {
2264 conflict_set
2265 .update(cx, |conflict_set, cx| {
2266 conflict_set.set_snapshot(snapshot, changed_range, cx);
2267 })
2268 .ok();
2269 }
2270 let futures = std::mem::take(&mut this.conflict_updated_futures);
2271 for tx in futures {
2272 tx.send(()).ok();
2273 }
2274 })
2275 }))
2276 }
2277
2278 rx
2279 }
2280
2281 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
2282 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
2283 }
2284
2285 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
2286 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
2287 }
2288
2289 fn handle_base_texts_updated(
2290 &mut self,
2291 buffer: text::BufferSnapshot,
2292 message: proto::UpdateDiffBases,
2293 cx: &mut Context<Self>,
2294 ) {
2295 use proto::update_diff_bases::Mode;
2296
2297 let Some(mode) = Mode::from_i32(message.mode) else {
2298 return;
2299 };
2300
2301 let diff_bases_change = match mode {
2302 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
2303 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
2304 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
2305 Mode::IndexAndHead => DiffBasesChange::SetEach {
2306 index: message.staged_text,
2307 head: message.committed_text,
2308 },
2309 };
2310
2311 self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
2312 }
2313
2314 pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
2315 if *self.recalculating_tx.borrow() {
2316 let mut rx = self.recalculating_tx.subscribe();
2317 return Some(async move {
2318 loop {
2319 let is_recalculating = rx.recv().await;
2320 if is_recalculating != Some(true) {
2321 break;
2322 }
2323 }
2324 });
2325 } else {
2326 None
2327 }
2328 }
2329
2330 fn diff_bases_changed(
2331 &mut self,
2332 buffer: text::BufferSnapshot,
2333 diff_bases_change: Option<DiffBasesChange>,
2334 cx: &mut Context<Self>,
2335 ) {
2336 match diff_bases_change {
2337 Some(DiffBasesChange::SetIndex(index)) => {
2338 self.index_text = index.map(|mut index| {
2339 text::LineEnding::normalize(&mut index);
2340 Arc::new(index)
2341 });
2342 self.index_changed = true;
2343 }
2344 Some(DiffBasesChange::SetHead(head)) => {
2345 self.head_text = head.map(|mut head| {
2346 text::LineEnding::normalize(&mut head);
2347 Arc::new(head)
2348 });
2349 self.head_changed = true;
2350 }
2351 Some(DiffBasesChange::SetBoth(text)) => {
2352 let text = text.map(|mut text| {
2353 text::LineEnding::normalize(&mut text);
2354 Arc::new(text)
2355 });
2356 self.head_text = text.clone();
2357 self.index_text = text;
2358 self.head_changed = true;
2359 self.index_changed = true;
2360 }
2361 Some(DiffBasesChange::SetEach { index, head }) => {
2362 self.index_text = index.map(|mut index| {
2363 text::LineEnding::normalize(&mut index);
2364 Arc::new(index)
2365 });
2366 self.index_changed = true;
2367 self.head_text = head.map(|mut head| {
2368 text::LineEnding::normalize(&mut head);
2369 Arc::new(head)
2370 });
2371 self.head_changed = true;
2372 }
2373 None => {}
2374 }
2375
2376 self.recalculate_diffs(buffer, cx)
2377 }
2378
2379 fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
2380 *self.recalculating_tx.borrow_mut() = true;
2381
2382 let language = self.language.clone();
2383 let language_registry = self.language_registry.clone();
2384 let unstaged_diff = self.unstaged_diff();
2385 let uncommitted_diff = self.uncommitted_diff();
2386 let head = self.head_text.clone();
2387 let index = self.index_text.clone();
2388 let index_changed = self.index_changed;
2389 let head_changed = self.head_changed;
2390 let language_changed = self.language_changed;
2391 let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_write;
2392 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
2393 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
2394 (None, None) => true,
2395 _ => false,
2396 };
2397 self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
2398 log::debug!(
2399 "start recalculating diffs for buffer {}",
2400 buffer.remote_id()
2401 );
2402
2403 let mut new_unstaged_diff = None;
2404 if let Some(unstaged_diff) = &unstaged_diff {
2405 new_unstaged_diff = Some(
2406 BufferDiff::update_diff(
2407 unstaged_diff.clone(),
2408 buffer.clone(),
2409 index,
2410 index_changed,
2411 language_changed,
2412 language.clone(),
2413 language_registry.clone(),
2414 cx,
2415 )
2416 .await?,
2417 );
2418 }
2419
2420 let mut new_uncommitted_diff = None;
2421 if let Some(uncommitted_diff) = &uncommitted_diff {
2422 new_uncommitted_diff = if index_matches_head {
2423 new_unstaged_diff.clone()
2424 } else {
2425 Some(
2426 BufferDiff::update_diff(
2427 uncommitted_diff.clone(),
2428 buffer.clone(),
2429 head,
2430 head_changed,
2431 language_changed,
2432 language.clone(),
2433 language_registry.clone(),
2434 cx,
2435 )
2436 .await?,
2437 )
2438 }
2439 }
2440
2441 let cancel = this.update(cx, |this, _| {
2442 // This checks whether all pending stage/unstage operations
2443 // have quiesced (i.e. both the corresponding write and the
2444 // read of that write have completed). If not, then we cancel
2445 // this recalculation attempt to avoid invalidating pending
2446 // state too quickly; another recalculation will come along
2447 // later and clear the pending state once the state of the index has settled.
2448 if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
2449 *this.recalculating_tx.borrow_mut() = false;
2450 true
2451 } else {
2452 false
2453 }
2454 })?;
2455 if cancel {
2456 log::debug!(
2457 concat!(
2458 "aborting recalculating diffs for buffer {}",
2459 "due to subsequent hunk operations",
2460 ),
2461 buffer.remote_id()
2462 );
2463 return Ok(());
2464 }
2465
2466 let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
2467 unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
2468 {
2469 unstaged_diff.update(cx, |diff, cx| {
2470 if language_changed {
2471 diff.language_changed(cx);
2472 }
2473 diff.set_snapshot(new_unstaged_diff, &buffer, cx)
2474 })?
2475 } else {
2476 None
2477 };
2478
2479 if let Some((uncommitted_diff, new_uncommitted_diff)) =
2480 uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
2481 {
2482 uncommitted_diff.update(cx, |diff, cx| {
2483 if language_changed {
2484 diff.language_changed(cx);
2485 }
2486 diff.set_snapshot_with_secondary(
2487 new_uncommitted_diff,
2488 &buffer,
2489 unstaged_changed_range,
2490 true,
2491 cx,
2492 );
2493 })?;
2494 }
2495
2496 log::debug!(
2497 "finished recalculating diffs for buffer {}",
2498 buffer.remote_id()
2499 );
2500
2501 if let Some(this) = this.upgrade() {
2502 this.update(cx, |this, _| {
2503 this.index_changed = false;
2504 this.head_changed = false;
2505 this.language_changed = false;
2506 *this.recalculating_tx.borrow_mut() = false;
2507 })?;
2508 }
2509
2510 Ok(())
2511 }));
2512 }
2513}
2514
2515fn make_remote_delegate(
2516 this: Entity<GitStore>,
2517 project_id: u64,
2518 repository_id: RepositoryId,
2519 askpass_id: u64,
2520 cx: &mut AsyncApp,
2521) -> AskPassDelegate {
2522 AskPassDelegate::new(cx, move |prompt, tx, cx| {
2523 this.update(cx, |this, cx| {
2524 let Some((client, _)) = this.downstream_client() else {
2525 return;
2526 };
2527 let response = client.request(proto::AskPassRequest {
2528 project_id,
2529 repository_id: repository_id.to_proto(),
2530 askpass_id,
2531 prompt,
2532 });
2533 cx.spawn(async move |_, _| {
2534 tx.send(response.await?.response).ok();
2535 anyhow::Ok(())
2536 })
2537 .detach_and_log_err(cx);
2538 })
2539 .log_err();
2540 })
2541}
2542
2543impl RepositoryId {
2544 pub fn to_proto(self) -> u64 {
2545 self.0
2546 }
2547
2548 pub fn from_proto(id: u64) -> Self {
2549 RepositoryId(id)
2550 }
2551}
2552
2553impl RepositorySnapshot {
2554 fn empty(id: RepositoryId, work_directory_abs_path: Arc<Path>) -> Self {
2555 Self {
2556 id,
2557 statuses_by_path: Default::default(),
2558 work_directory_abs_path,
2559 branch: None,
2560 head_commit: None,
2561 scan_id: 0,
2562 merge: Default::default(),
2563 }
2564 }
2565
2566 fn initial_update(&self, project_id: u64) -> proto::UpdateRepository {
2567 proto::UpdateRepository {
2568 branch_summary: self.branch.as_ref().map(branch_to_proto),
2569 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2570 updated_statuses: self
2571 .statuses_by_path
2572 .iter()
2573 .map(|entry| entry.to_proto())
2574 .collect(),
2575 removed_statuses: Default::default(),
2576 current_merge_conflicts: self
2577 .merge
2578 .conflicted_paths
2579 .iter()
2580 .map(|repo_path| repo_path.to_proto())
2581 .collect(),
2582 project_id,
2583 id: self.id.to_proto(),
2584 abs_path: self.work_directory_abs_path.to_proto(),
2585 entry_ids: vec![self.id.to_proto()],
2586 scan_id: self.scan_id,
2587 is_last_update: true,
2588 }
2589 }
2590
2591 fn build_update(&self, old: &Self, project_id: u64) -> proto::UpdateRepository {
2592 let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
2593 let mut removed_statuses: Vec<String> = Vec::new();
2594
2595 let mut new_statuses = self.statuses_by_path.iter().peekable();
2596 let mut old_statuses = old.statuses_by_path.iter().peekable();
2597
2598 let mut current_new_entry = new_statuses.next();
2599 let mut current_old_entry = old_statuses.next();
2600 loop {
2601 match (current_new_entry, current_old_entry) {
2602 (Some(new_entry), Some(old_entry)) => {
2603 match new_entry.repo_path.cmp(&old_entry.repo_path) {
2604 Ordering::Less => {
2605 updated_statuses.push(new_entry.to_proto());
2606 current_new_entry = new_statuses.next();
2607 }
2608 Ordering::Equal => {
2609 if new_entry.status != old_entry.status {
2610 updated_statuses.push(new_entry.to_proto());
2611 }
2612 current_old_entry = old_statuses.next();
2613 current_new_entry = new_statuses.next();
2614 }
2615 Ordering::Greater => {
2616 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2617 current_old_entry = old_statuses.next();
2618 }
2619 }
2620 }
2621 (None, Some(old_entry)) => {
2622 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2623 current_old_entry = old_statuses.next();
2624 }
2625 (Some(new_entry), None) => {
2626 updated_statuses.push(new_entry.to_proto());
2627 current_new_entry = new_statuses.next();
2628 }
2629 (None, None) => break,
2630 }
2631 }
2632
2633 proto::UpdateRepository {
2634 branch_summary: self.branch.as_ref().map(branch_to_proto),
2635 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2636 updated_statuses,
2637 removed_statuses,
2638 current_merge_conflicts: self
2639 .merge
2640 .conflicted_paths
2641 .iter()
2642 .map(|path| path.as_ref().to_proto())
2643 .collect(),
2644 project_id,
2645 id: self.id.to_proto(),
2646 abs_path: self.work_directory_abs_path.to_proto(),
2647 entry_ids: vec![],
2648 scan_id: self.scan_id,
2649 is_last_update: true,
2650 }
2651 }
2652
2653 pub fn status(&self) -> impl Iterator<Item = StatusEntry> + '_ {
2654 self.statuses_by_path.iter().cloned()
2655 }
2656
2657 pub fn status_summary(&self) -> GitSummary {
2658 self.statuses_by_path.summary().item_summary
2659 }
2660
2661 pub fn status_for_path(&self, path: &RepoPath) -> Option<StatusEntry> {
2662 self.statuses_by_path
2663 .get(&PathKey(path.0.clone()), &())
2664 .cloned()
2665 }
2666
2667 pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
2668 abs_path
2669 .strip_prefix(&self.work_directory_abs_path)
2670 .map(RepoPath::from)
2671 .ok()
2672 }
2673
2674 pub fn has_conflict(&self, repo_path: &RepoPath) -> bool {
2675 self.merge.conflicted_paths.contains(repo_path)
2676 }
2677
2678 /// This is the name that will be displayed in the repository selector for this repository.
2679 pub fn display_name(&self) -> SharedString {
2680 self.work_directory_abs_path
2681 .file_name()
2682 .unwrap_or_default()
2683 .to_string_lossy()
2684 .to_string()
2685 .into()
2686 }
2687}
2688
2689impl MergeDetails {
2690 async fn load(
2691 backend: &Arc<dyn GitRepository>,
2692 status: &SumTree<StatusEntry>,
2693 prev_snapshot: &RepositorySnapshot,
2694 ) -> Result<(MergeDetails, bool)> {
2695 let message = backend.merge_message().await;
2696 let heads = backend
2697 .revparse_batch(vec![
2698 "MERGE_HEAD".into(),
2699 "CHERRY_PICK_HEAD".into(),
2700 "REBASE_HEAD".into(),
2701 "REVERT_HEAD".into(),
2702 "APPLY_HEAD".into(),
2703 ])
2704 .await
2705 .log_err()
2706 .unwrap_or_default()
2707 .into_iter()
2708 .map(|opt| opt.map(SharedString::from))
2709 .collect::<Vec<_>>();
2710 let merge_heads_changed = heads != prev_snapshot.merge.heads;
2711 let conflicted_paths = if merge_heads_changed {
2712 TreeSet::from_ordered_entries(
2713 status
2714 .iter()
2715 .filter(|entry| entry.status.is_conflicted())
2716 .map(|entry| entry.repo_path.clone()),
2717 )
2718 } else {
2719 prev_snapshot.merge.conflicted_paths.clone()
2720 };
2721 let details = MergeDetails {
2722 conflicted_paths,
2723 message: message.map(SharedString::from),
2724 heads,
2725 };
2726 Ok((details, merge_heads_changed))
2727 }
2728}
2729
2730impl Repository {
2731 pub fn snapshot(&self) -> RepositorySnapshot {
2732 self.snapshot.clone()
2733 }
2734
2735 fn local(
2736 id: RepositoryId,
2737 work_directory_abs_path: Arc<Path>,
2738 dot_git_abs_path: Arc<Path>,
2739 repository_dir_abs_path: Arc<Path>,
2740 common_dir_abs_path: Arc<Path>,
2741 project_environment: WeakEntity<ProjectEnvironment>,
2742 fs: Arc<dyn Fs>,
2743 git_store: WeakEntity<GitStore>,
2744 cx: &mut Context<Self>,
2745 ) -> Self {
2746 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path.clone());
2747 Repository {
2748 this: cx.weak_entity(),
2749 git_store,
2750 snapshot,
2751 commit_message_buffer: None,
2752 askpass_delegates: Default::default(),
2753 paths_needing_status_update: Default::default(),
2754 latest_askpass_id: 0,
2755 job_sender: Repository::spawn_local_git_worker(
2756 work_directory_abs_path,
2757 dot_git_abs_path,
2758 repository_dir_abs_path,
2759 common_dir_abs_path,
2760 project_environment,
2761 fs,
2762 cx,
2763 ),
2764 job_id: 0,
2765 active_jobs: Default::default(),
2766 }
2767 }
2768
2769 fn remote(
2770 id: RepositoryId,
2771 work_directory_abs_path: Arc<Path>,
2772 project_id: ProjectId,
2773 client: AnyProtoClient,
2774 git_store: WeakEntity<GitStore>,
2775 cx: &mut Context<Self>,
2776 ) -> Self {
2777 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path);
2778 Self {
2779 this: cx.weak_entity(),
2780 snapshot,
2781 commit_message_buffer: None,
2782 git_store,
2783 paths_needing_status_update: Default::default(),
2784 job_sender: Self::spawn_remote_git_worker(project_id, client, cx),
2785 askpass_delegates: Default::default(),
2786 latest_askpass_id: 0,
2787 active_jobs: Default::default(),
2788 job_id: 0,
2789 }
2790 }
2791
2792 pub fn git_store(&self) -> Option<Entity<GitStore>> {
2793 self.git_store.upgrade()
2794 }
2795
2796 fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
2797 let this = cx.weak_entity();
2798 let git_store = self.git_store.clone();
2799 let _ = self.send_keyed_job(
2800 Some(GitJobKey::ReloadBufferDiffBases),
2801 None,
2802 |state, mut cx| async move {
2803 let RepositoryState::Local { backend, .. } = state else {
2804 log::error!("tried to recompute diffs for a non-local repository");
2805 return Ok(());
2806 };
2807
2808 let Some(this) = this.upgrade() else {
2809 return Ok(());
2810 };
2811
2812 let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
2813 git_store.update(cx, |git_store, cx| {
2814 git_store
2815 .diffs
2816 .iter()
2817 .filter_map(|(buffer_id, diff_state)| {
2818 let buffer_store = git_store.buffer_store.read(cx);
2819 let buffer = buffer_store.get(*buffer_id)?;
2820 let file = File::from_dyn(buffer.read(cx).file())?;
2821 let abs_path =
2822 file.worktree.read(cx).absolutize(&file.path).ok()?;
2823 let repo_path = this.abs_path_to_repo_path(&abs_path)?;
2824 log::debug!(
2825 "start reload diff bases for repo path {}",
2826 repo_path.0.display()
2827 );
2828 diff_state.update(cx, |diff_state, _| {
2829 let has_unstaged_diff = diff_state
2830 .unstaged_diff
2831 .as_ref()
2832 .is_some_and(|diff| diff.is_upgradable());
2833 let has_uncommitted_diff = diff_state
2834 .uncommitted_diff
2835 .as_ref()
2836 .is_some_and(|set| set.is_upgradable());
2837
2838 Some((
2839 buffer,
2840 repo_path,
2841 has_unstaged_diff.then(|| diff_state.index_text.clone()),
2842 has_uncommitted_diff.then(|| diff_state.head_text.clone()),
2843 ))
2844 })
2845 })
2846 .collect::<Vec<_>>()
2847 })
2848 })??;
2849
2850 let buffer_diff_base_changes = cx
2851 .background_spawn(async move {
2852 let mut changes = Vec::new();
2853 for (buffer, repo_path, current_index_text, current_head_text) in
2854 &repo_diff_state_updates
2855 {
2856 let index_text = if current_index_text.is_some() {
2857 backend.load_index_text(repo_path.clone()).await
2858 } else {
2859 None
2860 };
2861 let head_text = if current_head_text.is_some() {
2862 backend.load_committed_text(repo_path.clone()).await
2863 } else {
2864 None
2865 };
2866
2867 let change =
2868 match (current_index_text.as_ref(), current_head_text.as_ref()) {
2869 (Some(current_index), Some(current_head)) => {
2870 let index_changed =
2871 index_text.as_ref() != current_index.as_deref();
2872 let head_changed =
2873 head_text.as_ref() != current_head.as_deref();
2874 if index_changed && head_changed {
2875 if index_text == head_text {
2876 Some(DiffBasesChange::SetBoth(head_text))
2877 } else {
2878 Some(DiffBasesChange::SetEach {
2879 index: index_text,
2880 head: head_text,
2881 })
2882 }
2883 } else if index_changed {
2884 Some(DiffBasesChange::SetIndex(index_text))
2885 } else if head_changed {
2886 Some(DiffBasesChange::SetHead(head_text))
2887 } else {
2888 None
2889 }
2890 }
2891 (Some(current_index), None) => {
2892 let index_changed =
2893 index_text.as_ref() != current_index.as_deref();
2894 index_changed
2895 .then_some(DiffBasesChange::SetIndex(index_text))
2896 }
2897 (None, Some(current_head)) => {
2898 let head_changed =
2899 head_text.as_ref() != current_head.as_deref();
2900 head_changed.then_some(DiffBasesChange::SetHead(head_text))
2901 }
2902 (None, None) => None,
2903 };
2904
2905 changes.push((buffer.clone(), change))
2906 }
2907 changes
2908 })
2909 .await;
2910
2911 git_store.update(&mut cx, |git_store, cx| {
2912 for (buffer, diff_bases_change) in buffer_diff_base_changes {
2913 let buffer_snapshot = buffer.read(cx).text_snapshot();
2914 let buffer_id = buffer_snapshot.remote_id();
2915 let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
2916 continue;
2917 };
2918
2919 let downstream_client = git_store.downstream_client();
2920 diff_state.update(cx, |diff_state, cx| {
2921 use proto::update_diff_bases::Mode;
2922
2923 if let Some((diff_bases_change, (client, project_id))) =
2924 diff_bases_change.clone().zip(downstream_client)
2925 {
2926 let (staged_text, committed_text, mode) = match diff_bases_change {
2927 DiffBasesChange::SetIndex(index) => {
2928 (index, None, Mode::IndexOnly)
2929 }
2930 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
2931 DiffBasesChange::SetEach { index, head } => {
2932 (index, head, Mode::IndexAndHead)
2933 }
2934 DiffBasesChange::SetBoth(text) => {
2935 (None, text, Mode::IndexMatchesHead)
2936 }
2937 };
2938 client
2939 .send(proto::UpdateDiffBases {
2940 project_id: project_id.to_proto(),
2941 buffer_id: buffer_id.to_proto(),
2942 staged_text,
2943 committed_text,
2944 mode: mode as i32,
2945 })
2946 .log_err();
2947 }
2948
2949 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
2950 });
2951 }
2952 })
2953 },
2954 );
2955 }
2956
2957 pub fn send_job<F, Fut, R>(
2958 &mut self,
2959 status: Option<SharedString>,
2960 job: F,
2961 ) -> oneshot::Receiver<R>
2962 where
2963 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
2964 Fut: Future<Output = R> + 'static,
2965 R: Send + 'static,
2966 {
2967 self.send_keyed_job(None, status, job)
2968 }
2969
2970 fn send_keyed_job<F, Fut, R>(
2971 &mut self,
2972 key: Option<GitJobKey>,
2973 status: Option<SharedString>,
2974 job: F,
2975 ) -> oneshot::Receiver<R>
2976 where
2977 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
2978 Fut: Future<Output = R> + 'static,
2979 R: Send + 'static,
2980 {
2981 let (result_tx, result_rx) = futures::channel::oneshot::channel();
2982 let job_id = post_inc(&mut self.job_id);
2983 let this = self.this.clone();
2984 self.job_sender
2985 .unbounded_send(GitJob {
2986 key,
2987 job: Box::new(move |state, cx: &mut AsyncApp| {
2988 let job = job(state, cx.clone());
2989 cx.spawn(async move |cx| {
2990 if let Some(s) = status.clone() {
2991 this.update(cx, |this, cx| {
2992 this.active_jobs.insert(
2993 job_id,
2994 JobInfo {
2995 start: Instant::now(),
2996 message: s.clone(),
2997 },
2998 );
2999
3000 cx.notify();
3001 })
3002 .ok();
3003 }
3004 let result = job.await;
3005
3006 this.update(cx, |this, cx| {
3007 this.active_jobs.remove(&job_id);
3008 cx.notify();
3009 })
3010 .ok();
3011
3012 result_tx.send(result).ok();
3013 })
3014 }),
3015 })
3016 .ok();
3017 result_rx
3018 }
3019
3020 pub fn set_as_active_repository(&self, cx: &mut Context<Self>) {
3021 let Some(git_store) = self.git_store.upgrade() else {
3022 return;
3023 };
3024 let entity = cx.entity();
3025 git_store.update(cx, |git_store, cx| {
3026 let Some((&id, _)) = git_store
3027 .repositories
3028 .iter()
3029 .find(|(_, handle)| *handle == &entity)
3030 else {
3031 return;
3032 };
3033 git_store.active_repo_id = Some(id);
3034 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
3035 });
3036 }
3037
3038 pub fn cached_status(&self) -> impl '_ + Iterator<Item = StatusEntry> {
3039 self.snapshot.status()
3040 }
3041
3042 pub fn repo_path_to_project_path(&self, path: &RepoPath, cx: &App) -> Option<ProjectPath> {
3043 let git_store = self.git_store.upgrade()?;
3044 let worktree_store = git_store.read(cx).worktree_store.read(cx);
3045 let abs_path = self.snapshot.work_directory_abs_path.join(&path.0);
3046 let (worktree, relative_path) = worktree_store.find_worktree(abs_path, cx)?;
3047 Some(ProjectPath {
3048 worktree_id: worktree.read(cx).id(),
3049 path: relative_path.into(),
3050 })
3051 }
3052
3053 pub fn project_path_to_repo_path(&self, path: &ProjectPath, cx: &App) -> Option<RepoPath> {
3054 let git_store = self.git_store.upgrade()?;
3055 let worktree_store = git_store.read(cx).worktree_store.read(cx);
3056 let abs_path = worktree_store.absolutize(path, cx)?;
3057 self.snapshot.abs_path_to_repo_path(&abs_path)
3058 }
3059
3060 pub fn contains_sub_repo(&self, other: &Entity<Self>, cx: &App) -> bool {
3061 other
3062 .read(cx)
3063 .snapshot
3064 .work_directory_abs_path
3065 .starts_with(&self.snapshot.work_directory_abs_path)
3066 }
3067
3068 pub fn open_commit_buffer(
3069 &mut self,
3070 languages: Option<Arc<LanguageRegistry>>,
3071 buffer_store: Entity<BufferStore>,
3072 cx: &mut Context<Self>,
3073 ) -> Task<Result<Entity<Buffer>>> {
3074 let id = self.id;
3075 if let Some(buffer) = self.commit_message_buffer.clone() {
3076 return Task::ready(Ok(buffer));
3077 }
3078 let this = cx.weak_entity();
3079
3080 let rx = self.send_job(None, move |state, mut cx| async move {
3081 let Some(this) = this.upgrade() else {
3082 bail!("git store was dropped");
3083 };
3084 match state {
3085 RepositoryState::Local { .. } => {
3086 this.update(&mut cx, |_, cx| {
3087 Self::open_local_commit_buffer(languages, buffer_store, cx)
3088 })?
3089 .await
3090 }
3091 RepositoryState::Remote { project_id, client } => {
3092 let request = client.request(proto::OpenCommitMessageBuffer {
3093 project_id: project_id.0,
3094 repository_id: id.to_proto(),
3095 });
3096 let response = request.await.context("requesting to open commit buffer")?;
3097 let buffer_id = BufferId::new(response.buffer_id)?;
3098 let buffer = buffer_store
3099 .update(&mut cx, |buffer_store, cx| {
3100 buffer_store.wait_for_remote_buffer(buffer_id, cx)
3101 })?
3102 .await?;
3103 if let Some(language_registry) = languages {
3104 let git_commit_language =
3105 language_registry.language_for_name("Git Commit").await?;
3106 buffer.update(&mut cx, |buffer, cx| {
3107 buffer.set_language(Some(git_commit_language), cx);
3108 })?;
3109 }
3110 this.update(&mut cx, |this, _| {
3111 this.commit_message_buffer = Some(buffer.clone());
3112 })?;
3113 Ok(buffer)
3114 }
3115 }
3116 });
3117
3118 cx.spawn(|_, _: &mut AsyncApp| async move { rx.await? })
3119 }
3120
3121 fn open_local_commit_buffer(
3122 language_registry: Option<Arc<LanguageRegistry>>,
3123 buffer_store: Entity<BufferStore>,
3124 cx: &mut Context<Self>,
3125 ) -> Task<Result<Entity<Buffer>>> {
3126 cx.spawn(async move |repository, cx| {
3127 let buffer = buffer_store
3128 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx))?
3129 .await?;
3130
3131 if let Some(language_registry) = language_registry {
3132 let git_commit_language = language_registry.language_for_name("Git Commit").await?;
3133 buffer.update(cx, |buffer, cx| {
3134 buffer.set_language(Some(git_commit_language), cx);
3135 })?;
3136 }
3137
3138 repository.update(cx, |repository, _| {
3139 repository.commit_message_buffer = Some(buffer.clone());
3140 })?;
3141 Ok(buffer)
3142 })
3143 }
3144
3145 pub fn checkout_files(
3146 &mut self,
3147 commit: &str,
3148 paths: Vec<RepoPath>,
3149 _cx: &mut App,
3150 ) -> oneshot::Receiver<Result<()>> {
3151 let commit = commit.to_string();
3152 let id = self.id;
3153
3154 self.send_job(
3155 Some(format!("git checkout {}", commit).into()),
3156 move |git_repo, _| async move {
3157 match git_repo {
3158 RepositoryState::Local {
3159 backend,
3160 environment,
3161 ..
3162 } => {
3163 backend
3164 .checkout_files(commit, paths, environment.clone())
3165 .await
3166 }
3167 RepositoryState::Remote { project_id, client } => {
3168 client
3169 .request(proto::GitCheckoutFiles {
3170 project_id: project_id.0,
3171 repository_id: id.to_proto(),
3172 commit,
3173 paths: paths
3174 .into_iter()
3175 .map(|p| p.to_string_lossy().to_string())
3176 .collect(),
3177 })
3178 .await?;
3179
3180 Ok(())
3181 }
3182 }
3183 },
3184 )
3185 }
3186
3187 pub fn reset(
3188 &mut self,
3189 commit: String,
3190 reset_mode: ResetMode,
3191 _cx: &mut App,
3192 ) -> oneshot::Receiver<Result<()>> {
3193 let commit = commit.to_string();
3194 let id = self.id;
3195
3196 self.send_job(None, move |git_repo, _| async move {
3197 match git_repo {
3198 RepositoryState::Local {
3199 backend,
3200 environment,
3201 ..
3202 } => backend.reset(commit, reset_mode, environment).await,
3203 RepositoryState::Remote { project_id, client } => {
3204 client
3205 .request(proto::GitReset {
3206 project_id: project_id.0,
3207 repository_id: id.to_proto(),
3208 commit,
3209 mode: match reset_mode {
3210 ResetMode::Soft => git_reset::ResetMode::Soft.into(),
3211 ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
3212 },
3213 })
3214 .await?;
3215
3216 Ok(())
3217 }
3218 }
3219 })
3220 }
3221
3222 pub fn show(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDetails>> {
3223 let id = self.id;
3224 self.send_job(None, move |git_repo, _cx| async move {
3225 match git_repo {
3226 RepositoryState::Local { backend, .. } => backend.show(commit).await,
3227 RepositoryState::Remote { project_id, client } => {
3228 let resp = client
3229 .request(proto::GitShow {
3230 project_id: project_id.0,
3231 repository_id: id.to_proto(),
3232 commit,
3233 })
3234 .await?;
3235
3236 Ok(CommitDetails {
3237 sha: resp.sha.into(),
3238 message: resp.message.into(),
3239 commit_timestamp: resp.commit_timestamp,
3240 author_email: resp.author_email.into(),
3241 author_name: resp.author_name.into(),
3242 })
3243 }
3244 }
3245 })
3246 }
3247
3248 pub fn load_commit_diff(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
3249 let id = self.id;
3250 self.send_job(None, move |git_repo, cx| async move {
3251 match git_repo {
3252 RepositoryState::Local { backend, .. } => backend.load_commit(commit, cx).await,
3253 RepositoryState::Remote {
3254 client, project_id, ..
3255 } => {
3256 let response = client
3257 .request(proto::LoadCommitDiff {
3258 project_id: project_id.0,
3259 repository_id: id.to_proto(),
3260 commit,
3261 })
3262 .await?;
3263 Ok(CommitDiff {
3264 files: response
3265 .files
3266 .into_iter()
3267 .map(|file| CommitFile {
3268 path: Path::new(&file.path).into(),
3269 old_text: file.old_text,
3270 new_text: file.new_text,
3271 })
3272 .collect(),
3273 })
3274 }
3275 }
3276 })
3277 }
3278
3279 fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
3280 Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
3281 }
3282
3283 pub fn stage_entries(
3284 &self,
3285 entries: Vec<RepoPath>,
3286 cx: &mut Context<Self>,
3287 ) -> Task<anyhow::Result<()>> {
3288 if entries.is_empty() {
3289 return Task::ready(Ok(()));
3290 }
3291 let id = self.id;
3292
3293 let mut save_futures = Vec::new();
3294 if let Some(buffer_store) = self.buffer_store(cx) {
3295 buffer_store.update(cx, |buffer_store, cx| {
3296 for path in &entries {
3297 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3298 continue;
3299 };
3300 if let Some(buffer) = buffer_store.get_by_path(&project_path, cx) {
3301 if buffer
3302 .read(cx)
3303 .file()
3304 .map_or(false, |file| file.disk_state().exists())
3305 {
3306 save_futures.push(buffer_store.save_buffer(buffer, cx));
3307 }
3308 }
3309 }
3310 })
3311 }
3312
3313 cx.spawn(async move |this, cx| {
3314 for save_future in save_futures {
3315 save_future.await?;
3316 }
3317
3318 this.update(cx, |this, _| {
3319 this.send_job(None, move |git_repo, _cx| async move {
3320 match git_repo {
3321 RepositoryState::Local {
3322 backend,
3323 environment,
3324 ..
3325 } => backend.stage_paths(entries, environment.clone()).await,
3326 RepositoryState::Remote { project_id, client } => {
3327 client
3328 .request(proto::Stage {
3329 project_id: project_id.0,
3330 repository_id: id.to_proto(),
3331 paths: entries
3332 .into_iter()
3333 .map(|repo_path| repo_path.as_ref().to_proto())
3334 .collect(),
3335 })
3336 .await
3337 .context("sending stage request")?;
3338
3339 Ok(())
3340 }
3341 }
3342 })
3343 })?
3344 .await??;
3345
3346 Ok(())
3347 })
3348 }
3349
3350 pub fn unstage_entries(
3351 &self,
3352 entries: Vec<RepoPath>,
3353 cx: &mut Context<Self>,
3354 ) -> Task<anyhow::Result<()>> {
3355 if entries.is_empty() {
3356 return Task::ready(Ok(()));
3357 }
3358 let id = self.id;
3359
3360 let mut save_futures = Vec::new();
3361 if let Some(buffer_store) = self.buffer_store(cx) {
3362 buffer_store.update(cx, |buffer_store, cx| {
3363 for path in &entries {
3364 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3365 continue;
3366 };
3367 if let Some(buffer) = buffer_store.get_by_path(&project_path, cx) {
3368 if buffer
3369 .read(cx)
3370 .file()
3371 .map_or(false, |file| file.disk_state().exists())
3372 {
3373 save_futures.push(buffer_store.save_buffer(buffer, cx));
3374 }
3375 }
3376 }
3377 })
3378 }
3379
3380 cx.spawn(async move |this, cx| {
3381 for save_future in save_futures {
3382 save_future.await?;
3383 }
3384
3385 this.update(cx, |this, _| {
3386 this.send_job(None, move |git_repo, _cx| async move {
3387 match git_repo {
3388 RepositoryState::Local {
3389 backend,
3390 environment,
3391 ..
3392 } => backend.unstage_paths(entries, environment).await,
3393 RepositoryState::Remote { project_id, client } => {
3394 client
3395 .request(proto::Unstage {
3396 project_id: project_id.0,
3397 repository_id: id.to_proto(),
3398 paths: entries
3399 .into_iter()
3400 .map(|repo_path| repo_path.as_ref().to_proto())
3401 .collect(),
3402 })
3403 .await
3404 .context("sending unstage request")?;
3405
3406 Ok(())
3407 }
3408 }
3409 })
3410 })?
3411 .await??;
3412
3413 Ok(())
3414 })
3415 }
3416
3417 pub fn stage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3418 let to_stage = self
3419 .cached_status()
3420 .filter(|entry| !entry.status.staging().is_fully_staged())
3421 .map(|entry| entry.repo_path.clone())
3422 .collect();
3423 self.stage_entries(to_stage, cx)
3424 }
3425
3426 pub fn unstage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3427 let to_unstage = self
3428 .cached_status()
3429 .filter(|entry| entry.status.staging().has_staged())
3430 .map(|entry| entry.repo_path.clone())
3431 .collect();
3432 self.unstage_entries(to_unstage, cx)
3433 }
3434
3435 pub fn commit(
3436 &mut self,
3437 message: SharedString,
3438 name_and_email: Option<(SharedString, SharedString)>,
3439 options: CommitOptions,
3440 _cx: &mut App,
3441 ) -> oneshot::Receiver<Result<()>> {
3442 let id = self.id;
3443
3444 self.send_job(Some("git commit".into()), move |git_repo, _cx| async move {
3445 match git_repo {
3446 RepositoryState::Local {
3447 backend,
3448 environment,
3449 ..
3450 } => {
3451 backend
3452 .commit(message, name_and_email, options, environment)
3453 .await
3454 }
3455 RepositoryState::Remote { project_id, client } => {
3456 let (name, email) = name_and_email.unzip();
3457 client
3458 .request(proto::Commit {
3459 project_id: project_id.0,
3460 repository_id: id.to_proto(),
3461 message: String::from(message),
3462 name: name.map(String::from),
3463 email: email.map(String::from),
3464 options: Some(proto::commit::CommitOptions {
3465 amend: options.amend,
3466 }),
3467 })
3468 .await
3469 .context("sending commit request")?;
3470
3471 Ok(())
3472 }
3473 }
3474 })
3475 }
3476
3477 pub fn fetch(
3478 &mut self,
3479 askpass: AskPassDelegate,
3480 _cx: &mut App,
3481 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3482 let askpass_delegates = self.askpass_delegates.clone();
3483 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3484 let id = self.id;
3485
3486 self.send_job(Some("git fetch".into()), move |git_repo, cx| async move {
3487 match git_repo {
3488 RepositoryState::Local {
3489 backend,
3490 environment,
3491 ..
3492 } => backend.fetch(askpass, environment, cx).await,
3493 RepositoryState::Remote { project_id, client } => {
3494 askpass_delegates.lock().insert(askpass_id, askpass);
3495 let _defer = util::defer(|| {
3496 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3497 debug_assert!(askpass_delegate.is_some());
3498 });
3499
3500 let response = client
3501 .request(proto::Fetch {
3502 project_id: project_id.0,
3503 repository_id: id.to_proto(),
3504 askpass_id,
3505 })
3506 .await
3507 .context("sending fetch request")?;
3508
3509 Ok(RemoteCommandOutput {
3510 stdout: response.stdout,
3511 stderr: response.stderr,
3512 })
3513 }
3514 }
3515 })
3516 }
3517
3518 pub fn push(
3519 &mut self,
3520 branch: SharedString,
3521 remote: SharedString,
3522 options: Option<PushOptions>,
3523 askpass: AskPassDelegate,
3524 cx: &mut Context<Self>,
3525 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3526 let askpass_delegates = self.askpass_delegates.clone();
3527 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3528 let id = self.id;
3529
3530 let args = options
3531 .map(|option| match option {
3532 PushOptions::SetUpstream => " --set-upstream",
3533 PushOptions::Force => " --force",
3534 })
3535 .unwrap_or("");
3536
3537 let updates_tx = self
3538 .git_store()
3539 .and_then(|git_store| match &git_store.read(cx).state {
3540 GitStoreState::Local { downstream, .. } => downstream
3541 .as_ref()
3542 .map(|downstream| downstream.updates_tx.clone()),
3543 _ => None,
3544 });
3545
3546 let this = cx.weak_entity();
3547 self.send_job(
3548 Some(format!("git push {} {} {}", args, branch, remote).into()),
3549 move |git_repo, mut cx| async move {
3550 match git_repo {
3551 RepositoryState::Local {
3552 backend,
3553 environment,
3554 ..
3555 } => {
3556 let result = backend
3557 .push(
3558 branch.to_string(),
3559 remote.to_string(),
3560 options,
3561 askpass,
3562 environment.clone(),
3563 cx.clone(),
3564 )
3565 .await;
3566 if result.is_ok() {
3567 let branches = backend.branches().await?;
3568 let branch = branches.into_iter().find(|branch| branch.is_head);
3569 log::info!("head branch after scan is {branch:?}");
3570 let snapshot = this.update(&mut cx, |this, cx| {
3571 this.snapshot.branch = branch;
3572 let snapshot = this.snapshot.clone();
3573 cx.emit(RepositoryEvent::Updated { full_scan: false });
3574 snapshot
3575 })?;
3576 if let Some(updates_tx) = updates_tx {
3577 updates_tx
3578 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
3579 .ok();
3580 }
3581 }
3582 result
3583 }
3584 RepositoryState::Remote { project_id, client } => {
3585 askpass_delegates.lock().insert(askpass_id, askpass);
3586 let _defer = util::defer(|| {
3587 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3588 debug_assert!(askpass_delegate.is_some());
3589 });
3590 let response = client
3591 .request(proto::Push {
3592 project_id: project_id.0,
3593 repository_id: id.to_proto(),
3594 askpass_id,
3595 branch_name: branch.to_string(),
3596 remote_name: remote.to_string(),
3597 options: options.map(|options| match options {
3598 PushOptions::Force => proto::push::PushOptions::Force,
3599 PushOptions::SetUpstream => {
3600 proto::push::PushOptions::SetUpstream
3601 }
3602 }
3603 as i32),
3604 })
3605 .await
3606 .context("sending push request")?;
3607
3608 Ok(RemoteCommandOutput {
3609 stdout: response.stdout,
3610 stderr: response.stderr,
3611 })
3612 }
3613 }
3614 },
3615 )
3616 }
3617
3618 pub fn pull(
3619 &mut self,
3620 branch: SharedString,
3621 remote: SharedString,
3622 askpass: AskPassDelegate,
3623 _cx: &mut App,
3624 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3625 let askpass_delegates = self.askpass_delegates.clone();
3626 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3627 let id = self.id;
3628
3629 self.send_job(
3630 Some(format!("git pull {} {}", remote, branch).into()),
3631 move |git_repo, cx| async move {
3632 match git_repo {
3633 RepositoryState::Local {
3634 backend,
3635 environment,
3636 ..
3637 } => {
3638 backend
3639 .pull(
3640 branch.to_string(),
3641 remote.to_string(),
3642 askpass,
3643 environment.clone(),
3644 cx,
3645 )
3646 .await
3647 }
3648 RepositoryState::Remote { project_id, client } => {
3649 askpass_delegates.lock().insert(askpass_id, askpass);
3650 let _defer = util::defer(|| {
3651 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3652 debug_assert!(askpass_delegate.is_some());
3653 });
3654 let response = client
3655 .request(proto::Pull {
3656 project_id: project_id.0,
3657 repository_id: id.to_proto(),
3658 askpass_id,
3659 branch_name: branch.to_string(),
3660 remote_name: remote.to_string(),
3661 })
3662 .await
3663 .context("sending pull request")?;
3664
3665 Ok(RemoteCommandOutput {
3666 stdout: response.stdout,
3667 stderr: response.stderr,
3668 })
3669 }
3670 }
3671 },
3672 )
3673 }
3674
3675 fn spawn_set_index_text_job(
3676 &mut self,
3677 path: RepoPath,
3678 content: Option<String>,
3679 hunk_staging_operation_count: Option<usize>,
3680 cx: &mut Context<Self>,
3681 ) -> oneshot::Receiver<anyhow::Result<()>> {
3682 let id = self.id;
3683 let this = cx.weak_entity();
3684 let git_store = self.git_store.clone();
3685 self.send_keyed_job(
3686 Some(GitJobKey::WriteIndex(path.clone())),
3687 None,
3688 move |git_repo, mut cx| async move {
3689 log::debug!("start updating index text for buffer {}", path.display());
3690 match git_repo {
3691 RepositoryState::Local {
3692 backend,
3693 environment,
3694 ..
3695 } => {
3696 backend
3697 .set_index_text(path.clone(), content, environment.clone())
3698 .await?;
3699 }
3700 RepositoryState::Remote { project_id, client } => {
3701 client
3702 .request(proto::SetIndexText {
3703 project_id: project_id.0,
3704 repository_id: id.to_proto(),
3705 path: path.as_ref().to_proto(),
3706 text: content,
3707 })
3708 .await?;
3709 }
3710 }
3711 log::debug!("finish updating index text for buffer {}", path.display());
3712
3713 if let Some(hunk_staging_operation_count) = hunk_staging_operation_count {
3714 let project_path = this
3715 .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
3716 .ok()
3717 .flatten();
3718 git_store.update(&mut cx, |git_store, cx| {
3719 let buffer_id = git_store
3720 .buffer_store
3721 .read(cx)
3722 .get_by_path(&project_path?, cx)?
3723 .read(cx)
3724 .remote_id();
3725 let diff_state = git_store.diffs.get(&buffer_id)?;
3726 diff_state.update(cx, |diff_state, _| {
3727 diff_state.hunk_staging_operation_count_as_of_write =
3728 hunk_staging_operation_count;
3729 });
3730 Some(())
3731 })?;
3732 }
3733 Ok(())
3734 },
3735 )
3736 }
3737
3738 pub fn get_remotes(
3739 &mut self,
3740 branch_name: Option<String>,
3741 ) -> oneshot::Receiver<Result<Vec<Remote>>> {
3742 let id = self.id;
3743 self.send_job(None, move |repo, _cx| async move {
3744 match repo {
3745 RepositoryState::Local { backend, .. } => backend.get_remotes(branch_name).await,
3746 RepositoryState::Remote { project_id, client } => {
3747 let response = client
3748 .request(proto::GetRemotes {
3749 project_id: project_id.0,
3750 repository_id: id.to_proto(),
3751 branch_name,
3752 })
3753 .await?;
3754
3755 let remotes = response
3756 .remotes
3757 .into_iter()
3758 .map(|remotes| git::repository::Remote {
3759 name: remotes.name.into(),
3760 })
3761 .collect();
3762
3763 Ok(remotes)
3764 }
3765 }
3766 })
3767 }
3768
3769 pub fn branches(&mut self) -> oneshot::Receiver<Result<Vec<Branch>>> {
3770 let id = self.id;
3771 self.send_job(None, move |repo, cx| async move {
3772 match repo {
3773 RepositoryState::Local { backend, .. } => {
3774 let backend = backend.clone();
3775 cx.background_spawn(async move { backend.branches().await })
3776 .await
3777 }
3778 RepositoryState::Remote { project_id, client } => {
3779 let response = client
3780 .request(proto::GitGetBranches {
3781 project_id: project_id.0,
3782 repository_id: id.to_proto(),
3783 })
3784 .await?;
3785
3786 let branches = response
3787 .branches
3788 .into_iter()
3789 .map(|branch| proto_to_branch(&branch))
3790 .collect();
3791
3792 Ok(branches)
3793 }
3794 }
3795 })
3796 }
3797
3798 pub fn diff(&mut self, diff_type: DiffType, _cx: &App) -> oneshot::Receiver<Result<String>> {
3799 let id = self.id;
3800 self.send_job(None, move |repo, _cx| async move {
3801 match repo {
3802 RepositoryState::Local { backend, .. } => backend.diff(diff_type).await,
3803 RepositoryState::Remote { project_id, client } => {
3804 let response = client
3805 .request(proto::GitDiff {
3806 project_id: project_id.0,
3807 repository_id: id.to_proto(),
3808 diff_type: match diff_type {
3809 DiffType::HeadToIndex => {
3810 proto::git_diff::DiffType::HeadToIndex.into()
3811 }
3812 DiffType::HeadToWorktree => {
3813 proto::git_diff::DiffType::HeadToWorktree.into()
3814 }
3815 },
3816 })
3817 .await?;
3818
3819 Ok(response.diff)
3820 }
3821 }
3822 })
3823 }
3824
3825 pub fn create_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3826 let id = self.id;
3827 self.send_job(
3828 Some(format!("git switch -c {branch_name}").into()),
3829 move |repo, _cx| async move {
3830 match repo {
3831 RepositoryState::Local { backend, .. } => {
3832 backend.create_branch(branch_name).await
3833 }
3834 RepositoryState::Remote { project_id, client } => {
3835 client
3836 .request(proto::GitCreateBranch {
3837 project_id: project_id.0,
3838 repository_id: id.to_proto(),
3839 branch_name,
3840 })
3841 .await?;
3842
3843 Ok(())
3844 }
3845 }
3846 },
3847 )
3848 }
3849
3850 pub fn change_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3851 let id = self.id;
3852 self.send_job(
3853 Some(format!("git switch {branch_name}").into()),
3854 move |repo, _cx| async move {
3855 match repo {
3856 RepositoryState::Local { backend, .. } => {
3857 backend.change_branch(branch_name).await
3858 }
3859 RepositoryState::Remote { project_id, client } => {
3860 client
3861 .request(proto::GitChangeBranch {
3862 project_id: project_id.0,
3863 repository_id: id.to_proto(),
3864 branch_name,
3865 })
3866 .await?;
3867
3868 Ok(())
3869 }
3870 }
3871 },
3872 )
3873 }
3874
3875 pub fn check_for_pushed_commits(&mut self) -> oneshot::Receiver<Result<Vec<SharedString>>> {
3876 let id = self.id;
3877 self.send_job(None, move |repo, _cx| async move {
3878 match repo {
3879 RepositoryState::Local { backend, .. } => backend.check_for_pushed_commit().await,
3880 RepositoryState::Remote { project_id, client } => {
3881 let response = client
3882 .request(proto::CheckForPushedCommits {
3883 project_id: project_id.0,
3884 repository_id: id.to_proto(),
3885 })
3886 .await?;
3887
3888 let branches = response.pushed_to.into_iter().map(Into::into).collect();
3889
3890 Ok(branches)
3891 }
3892 }
3893 })
3894 }
3895
3896 pub fn checkpoint(&mut self) -> oneshot::Receiver<Result<GitRepositoryCheckpoint>> {
3897 self.send_job(None, |repo, _cx| async move {
3898 match repo {
3899 RepositoryState::Local { backend, .. } => backend.checkpoint().await,
3900 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3901 }
3902 })
3903 }
3904
3905 pub fn restore_checkpoint(
3906 &mut self,
3907 checkpoint: GitRepositoryCheckpoint,
3908 ) -> oneshot::Receiver<Result<()>> {
3909 self.send_job(None, move |repo, _cx| async move {
3910 match repo {
3911 RepositoryState::Local { backend, .. } => {
3912 backend.restore_checkpoint(checkpoint).await
3913 }
3914 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3915 }
3916 })
3917 }
3918
3919 pub(crate) fn apply_remote_update(
3920 &mut self,
3921 update: proto::UpdateRepository,
3922 cx: &mut Context<Self>,
3923 ) -> Result<()> {
3924 let conflicted_paths = TreeSet::from_ordered_entries(
3925 update
3926 .current_merge_conflicts
3927 .into_iter()
3928 .map(|path| RepoPath(Path::new(&path).into())),
3929 );
3930 self.snapshot.branch = update.branch_summary.as_ref().map(proto_to_branch);
3931 self.snapshot.head_commit = update
3932 .head_commit_details
3933 .as_ref()
3934 .map(proto_to_commit_details);
3935
3936 self.snapshot.merge.conflicted_paths = conflicted_paths;
3937
3938 let edits = update
3939 .removed_statuses
3940 .into_iter()
3941 .map(|path| sum_tree::Edit::Remove(PathKey(FromProto::from_proto(path))))
3942 .chain(
3943 update
3944 .updated_statuses
3945 .into_iter()
3946 .filter_map(|updated_status| {
3947 Some(sum_tree::Edit::Insert(updated_status.try_into().log_err()?))
3948 }),
3949 )
3950 .collect::<Vec<_>>();
3951 self.snapshot.statuses_by_path.edit(edits, &());
3952 if update.is_last_update {
3953 self.snapshot.scan_id = update.scan_id;
3954 }
3955 cx.emit(RepositoryEvent::Updated { full_scan: true });
3956 Ok(())
3957 }
3958
3959 pub fn compare_checkpoints(
3960 &mut self,
3961 left: GitRepositoryCheckpoint,
3962 right: GitRepositoryCheckpoint,
3963 ) -> oneshot::Receiver<Result<bool>> {
3964 self.send_job(None, move |repo, _cx| async move {
3965 match repo {
3966 RepositoryState::Local { backend, .. } => {
3967 backend.compare_checkpoints(left, right).await
3968 }
3969 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3970 }
3971 })
3972 }
3973
3974 pub fn diff_checkpoints(
3975 &mut self,
3976 base_checkpoint: GitRepositoryCheckpoint,
3977 target_checkpoint: GitRepositoryCheckpoint,
3978 ) -> oneshot::Receiver<Result<String>> {
3979 self.send_job(None, move |repo, _cx| async move {
3980 match repo {
3981 RepositoryState::Local { backend, .. } => {
3982 backend
3983 .diff_checkpoints(base_checkpoint, target_checkpoint)
3984 .await
3985 }
3986 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3987 }
3988 })
3989 }
3990
3991 fn schedule_scan(
3992 &mut self,
3993 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
3994 cx: &mut Context<Self>,
3995 ) {
3996 let this = cx.weak_entity();
3997 let _ = self.send_keyed_job(
3998 Some(GitJobKey::ReloadGitState),
3999 None,
4000 |state, mut cx| async move {
4001 let Some(this) = this.upgrade() else {
4002 return Ok(());
4003 };
4004 let RepositoryState::Local { backend, .. } = state else {
4005 bail!("not a local repository")
4006 };
4007 let (snapshot, events) = this
4008 .update(&mut cx, |this, _| {
4009 compute_snapshot(
4010 this.id,
4011 this.work_directory_abs_path.clone(),
4012 this.snapshot.clone(),
4013 backend.clone(),
4014 )
4015 })?
4016 .await?;
4017 this.update(&mut cx, |this, cx| {
4018 this.snapshot = snapshot.clone();
4019 for event in events {
4020 cx.emit(event);
4021 }
4022 })?;
4023 if let Some(updates_tx) = updates_tx {
4024 updates_tx
4025 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
4026 .ok();
4027 }
4028 Ok(())
4029 },
4030 );
4031 }
4032
4033 fn spawn_local_git_worker(
4034 work_directory_abs_path: Arc<Path>,
4035 dot_git_abs_path: Arc<Path>,
4036 _repository_dir_abs_path: Arc<Path>,
4037 _common_dir_abs_path: Arc<Path>,
4038 project_environment: WeakEntity<ProjectEnvironment>,
4039 fs: Arc<dyn Fs>,
4040 cx: &mut Context<Self>,
4041 ) -> mpsc::UnboundedSender<GitJob> {
4042 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4043
4044 cx.spawn(async move |_, cx| {
4045 let environment = project_environment
4046 .upgrade()
4047 .ok_or_else(|| anyhow!("missing project environment"))?
4048 .update(cx, |project_environment, cx| {
4049 project_environment.get_directory_environment(work_directory_abs_path.clone(), cx)
4050 })?
4051 .await
4052 .unwrap_or_else(|| {
4053 log::error!("failed to get working directory environment for repository {work_directory_abs_path:?}");
4054 HashMap::default()
4055 });
4056 let backend = cx
4057 .background_spawn(async move {
4058 fs.open_repo(&dot_git_abs_path)
4059 .ok_or_else(|| anyhow!("failed to build repository"))
4060 })
4061 .await?;
4062
4063 if let Some(git_hosting_provider_registry) =
4064 cx.update(|cx| GitHostingProviderRegistry::try_global(cx))?
4065 {
4066 git_hosting_providers::register_additional_providers(
4067 git_hosting_provider_registry,
4068 backend.clone(),
4069 );
4070 }
4071
4072 let state = RepositoryState::Local {
4073 backend,
4074 environment: Arc::new(environment),
4075 };
4076 let mut jobs = VecDeque::new();
4077 loop {
4078 while let Ok(Some(next_job)) = job_rx.try_next() {
4079 jobs.push_back(next_job);
4080 }
4081
4082 if let Some(job) = jobs.pop_front() {
4083 if let Some(current_key) = &job.key {
4084 if jobs
4085 .iter()
4086 .any(|other_job| other_job.key.as_ref() == Some(current_key))
4087 {
4088 continue;
4089 }
4090 }
4091 (job.job)(state.clone(), cx).await;
4092 } else if let Some(job) = job_rx.next().await {
4093 jobs.push_back(job);
4094 } else {
4095 break;
4096 }
4097 }
4098 anyhow::Ok(())
4099 })
4100 .detach_and_log_err(cx);
4101
4102 job_tx
4103 }
4104
4105 fn spawn_remote_git_worker(
4106 project_id: ProjectId,
4107 client: AnyProtoClient,
4108 cx: &mut Context<Self>,
4109 ) -> mpsc::UnboundedSender<GitJob> {
4110 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4111
4112 cx.spawn(async move |_, cx| {
4113 let state = RepositoryState::Remote { project_id, client };
4114 let mut jobs = VecDeque::new();
4115 loop {
4116 while let Ok(Some(next_job)) = job_rx.try_next() {
4117 jobs.push_back(next_job);
4118 }
4119
4120 if let Some(job) = jobs.pop_front() {
4121 if let Some(current_key) = &job.key {
4122 if jobs
4123 .iter()
4124 .any(|other_job| other_job.key.as_ref() == Some(current_key))
4125 {
4126 continue;
4127 }
4128 }
4129 (job.job)(state.clone(), cx).await;
4130 } else if let Some(job) = job_rx.next().await {
4131 jobs.push_back(job);
4132 } else {
4133 break;
4134 }
4135 }
4136 anyhow::Ok(())
4137 })
4138 .detach_and_log_err(cx);
4139
4140 job_tx
4141 }
4142
4143 fn load_staged_text(
4144 &mut self,
4145 buffer_id: BufferId,
4146 repo_path: RepoPath,
4147 cx: &App,
4148 ) -> Task<Result<Option<String>>> {
4149 let rx = self.send_job(None, move |state, _| async move {
4150 match state {
4151 RepositoryState::Local { backend, .. } => {
4152 anyhow::Ok(backend.load_index_text(repo_path).await)
4153 }
4154 RepositoryState::Remote { project_id, client } => {
4155 let response = client
4156 .request(proto::OpenUnstagedDiff {
4157 project_id: project_id.to_proto(),
4158 buffer_id: buffer_id.to_proto(),
4159 })
4160 .await?;
4161 Ok(response.staged_text)
4162 }
4163 }
4164 });
4165 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4166 }
4167
4168 fn load_committed_text(
4169 &mut self,
4170 buffer_id: BufferId,
4171 repo_path: RepoPath,
4172 cx: &App,
4173 ) -> Task<Result<DiffBasesChange>> {
4174 let rx = self.send_job(None, move |state, _| async move {
4175 match state {
4176 RepositoryState::Local { backend, .. } => {
4177 let committed_text = backend.load_committed_text(repo_path.clone()).await;
4178 let staged_text = backend.load_index_text(repo_path).await;
4179 let diff_bases_change = if committed_text == staged_text {
4180 DiffBasesChange::SetBoth(committed_text)
4181 } else {
4182 DiffBasesChange::SetEach {
4183 index: staged_text,
4184 head: committed_text,
4185 }
4186 };
4187 anyhow::Ok(diff_bases_change)
4188 }
4189 RepositoryState::Remote { project_id, client } => {
4190 use proto::open_uncommitted_diff_response::Mode;
4191
4192 let response = client
4193 .request(proto::OpenUncommittedDiff {
4194 project_id: project_id.to_proto(),
4195 buffer_id: buffer_id.to_proto(),
4196 })
4197 .await?;
4198 let mode =
4199 Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
4200 let bases = match mode {
4201 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
4202 Mode::IndexAndHead => DiffBasesChange::SetEach {
4203 head: response.committed_text,
4204 index: response.staged_text,
4205 },
4206 };
4207 Ok(bases)
4208 }
4209 }
4210 });
4211
4212 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4213 }
4214
4215 fn paths_changed(
4216 &mut self,
4217 paths: Vec<RepoPath>,
4218 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4219 cx: &mut Context<Self>,
4220 ) {
4221 self.paths_needing_status_update.extend(paths);
4222
4223 let this = cx.weak_entity();
4224 let _ = self.send_keyed_job(
4225 Some(GitJobKey::RefreshStatuses),
4226 None,
4227 |state, mut cx| async move {
4228 let (prev_snapshot, mut changed_paths) = this.update(&mut cx, |this, _| {
4229 (
4230 this.snapshot.clone(),
4231 mem::take(&mut this.paths_needing_status_update),
4232 )
4233 })?;
4234 let RepositoryState::Local { backend, .. } = state else {
4235 bail!("not a local repository")
4236 };
4237
4238 let paths = changed_paths.iter().cloned().collect::<Vec<_>>();
4239 let statuses = backend.status(&paths).await?;
4240
4241 let changed_path_statuses = cx
4242 .background_spawn(async move {
4243 let mut changed_path_statuses = Vec::new();
4244 let prev_statuses = prev_snapshot.statuses_by_path.clone();
4245 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4246
4247 for (repo_path, status) in &*statuses.entries {
4248 changed_paths.remove(repo_path);
4249 if cursor.seek_forward(&PathTarget::Path(repo_path), Bias::Left, &()) {
4250 if cursor.item().is_some_and(|entry| entry.status == *status) {
4251 continue;
4252 }
4253 }
4254
4255 changed_path_statuses.push(Edit::Insert(StatusEntry {
4256 repo_path: repo_path.clone(),
4257 status: *status,
4258 }));
4259 }
4260 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4261 for path in changed_paths.iter() {
4262 if cursor.seek_forward(&PathTarget::Path(&path), Bias::Left, &()) {
4263 changed_path_statuses.push(Edit::Remove(PathKey(path.0.clone())));
4264 }
4265 }
4266 changed_path_statuses
4267 })
4268 .await;
4269
4270 this.update(&mut cx, |this, cx| {
4271 if !changed_path_statuses.is_empty() {
4272 this.snapshot
4273 .statuses_by_path
4274 .edit(changed_path_statuses, &());
4275 this.snapshot.scan_id += 1;
4276 if let Some(updates_tx) = updates_tx {
4277 updates_tx
4278 .unbounded_send(DownstreamUpdate::UpdateRepository(
4279 this.snapshot.clone(),
4280 ))
4281 .ok();
4282 }
4283 }
4284 cx.emit(RepositoryEvent::Updated { full_scan: false });
4285 })
4286 },
4287 );
4288 }
4289
4290 /// currently running git command and when it started
4291 pub fn current_job(&self) -> Option<JobInfo> {
4292 self.active_jobs.values().next().cloned()
4293 }
4294
4295 pub fn barrier(&mut self) -> oneshot::Receiver<()> {
4296 self.send_job(None, |_, _| async {})
4297 }
4298}
4299
4300fn get_permalink_in_rust_registry_src(
4301 provider_registry: Arc<GitHostingProviderRegistry>,
4302 path: PathBuf,
4303 selection: Range<u32>,
4304) -> Result<url::Url> {
4305 #[derive(Deserialize)]
4306 struct CargoVcsGit {
4307 sha1: String,
4308 }
4309
4310 #[derive(Deserialize)]
4311 struct CargoVcsInfo {
4312 git: CargoVcsGit,
4313 path_in_vcs: String,
4314 }
4315
4316 #[derive(Deserialize)]
4317 struct CargoPackage {
4318 repository: String,
4319 }
4320
4321 #[derive(Deserialize)]
4322 struct CargoToml {
4323 package: CargoPackage,
4324 }
4325
4326 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
4327 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
4328 Some((dir, json))
4329 }) else {
4330 bail!("No .cargo_vcs_info.json found in parent directories")
4331 };
4332 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
4333 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
4334 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
4335 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
4336 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
4337 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
4338 let permalink = provider.build_permalink(
4339 remote,
4340 BuildPermalinkParams {
4341 sha: &cargo_vcs_info.git.sha1,
4342 path: &path.to_string_lossy(),
4343 selection: Some(selection),
4344 },
4345 );
4346 Ok(permalink)
4347}
4348
4349fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
4350 let Some(blame) = blame else {
4351 return proto::BlameBufferResponse {
4352 blame_response: None,
4353 };
4354 };
4355
4356 let entries = blame
4357 .entries
4358 .into_iter()
4359 .map(|entry| proto::BlameEntry {
4360 sha: entry.sha.as_bytes().into(),
4361 start_line: entry.range.start,
4362 end_line: entry.range.end,
4363 original_line_number: entry.original_line_number,
4364 author: entry.author.clone(),
4365 author_mail: entry.author_mail.clone(),
4366 author_time: entry.author_time,
4367 author_tz: entry.author_tz.clone(),
4368 committer: entry.committer_name.clone(),
4369 committer_mail: entry.committer_email.clone(),
4370 committer_time: entry.committer_time,
4371 committer_tz: entry.committer_tz.clone(),
4372 summary: entry.summary.clone(),
4373 previous: entry.previous.clone(),
4374 filename: entry.filename.clone(),
4375 })
4376 .collect::<Vec<_>>();
4377
4378 let messages = blame
4379 .messages
4380 .into_iter()
4381 .map(|(oid, message)| proto::CommitMessage {
4382 oid: oid.as_bytes().into(),
4383 message,
4384 })
4385 .collect::<Vec<_>>();
4386
4387 proto::BlameBufferResponse {
4388 blame_response: Some(proto::blame_buffer_response::BlameResponse {
4389 entries,
4390 messages,
4391 remote_url: blame.remote_url,
4392 }),
4393 }
4394}
4395
4396fn deserialize_blame_buffer_response(
4397 response: proto::BlameBufferResponse,
4398) -> Option<git::blame::Blame> {
4399 let response = response.blame_response?;
4400 let entries = response
4401 .entries
4402 .into_iter()
4403 .filter_map(|entry| {
4404 Some(git::blame::BlameEntry {
4405 sha: git::Oid::from_bytes(&entry.sha).ok()?,
4406 range: entry.start_line..entry.end_line,
4407 original_line_number: entry.original_line_number,
4408 committer_name: entry.committer,
4409 committer_time: entry.committer_time,
4410 committer_tz: entry.committer_tz,
4411 committer_email: entry.committer_mail,
4412 author: entry.author,
4413 author_mail: entry.author_mail,
4414 author_time: entry.author_time,
4415 author_tz: entry.author_tz,
4416 summary: entry.summary,
4417 previous: entry.previous,
4418 filename: entry.filename,
4419 })
4420 })
4421 .collect::<Vec<_>>();
4422
4423 let messages = response
4424 .messages
4425 .into_iter()
4426 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
4427 .collect::<HashMap<_, _>>();
4428
4429 Some(Blame {
4430 entries,
4431 messages,
4432 remote_url: response.remote_url,
4433 })
4434}
4435
4436fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
4437 proto::Branch {
4438 is_head: branch.is_head,
4439 name: branch.name.to_string(),
4440 unix_timestamp: branch
4441 .most_recent_commit
4442 .as_ref()
4443 .map(|commit| commit.commit_timestamp as u64),
4444 upstream: branch.upstream.as_ref().map(|upstream| proto::GitUpstream {
4445 ref_name: upstream.ref_name.to_string(),
4446 tracking: upstream
4447 .tracking
4448 .status()
4449 .map(|upstream| proto::UpstreamTracking {
4450 ahead: upstream.ahead as u64,
4451 behind: upstream.behind as u64,
4452 }),
4453 }),
4454 most_recent_commit: branch
4455 .most_recent_commit
4456 .as_ref()
4457 .map(|commit| proto::CommitSummary {
4458 sha: commit.sha.to_string(),
4459 subject: commit.subject.to_string(),
4460 commit_timestamp: commit.commit_timestamp,
4461 }),
4462 }
4463}
4464
4465fn proto_to_branch(proto: &proto::Branch) -> git::repository::Branch {
4466 git::repository::Branch {
4467 is_head: proto.is_head,
4468 name: proto.name.clone().into(),
4469 upstream: proto
4470 .upstream
4471 .as_ref()
4472 .map(|upstream| git::repository::Upstream {
4473 ref_name: upstream.ref_name.to_string().into(),
4474 tracking: upstream
4475 .tracking
4476 .as_ref()
4477 .map(|tracking| {
4478 git::repository::UpstreamTracking::Tracked(UpstreamTrackingStatus {
4479 ahead: tracking.ahead as u32,
4480 behind: tracking.behind as u32,
4481 })
4482 })
4483 .unwrap_or(git::repository::UpstreamTracking::Gone),
4484 }),
4485 most_recent_commit: proto.most_recent_commit.as_ref().map(|commit| {
4486 git::repository::CommitSummary {
4487 sha: commit.sha.to_string().into(),
4488 subject: commit.subject.to_string().into(),
4489 commit_timestamp: commit.commit_timestamp,
4490 has_parent: true,
4491 }
4492 }),
4493 }
4494}
4495
4496fn commit_details_to_proto(commit: &CommitDetails) -> proto::GitCommitDetails {
4497 proto::GitCommitDetails {
4498 sha: commit.sha.to_string(),
4499 message: commit.message.to_string(),
4500 commit_timestamp: commit.commit_timestamp,
4501 author_email: commit.author_email.to_string(),
4502 author_name: commit.author_name.to_string(),
4503 }
4504}
4505
4506fn proto_to_commit_details(proto: &proto::GitCommitDetails) -> CommitDetails {
4507 CommitDetails {
4508 sha: proto.sha.clone().into(),
4509 message: proto.message.clone().into(),
4510 commit_timestamp: proto.commit_timestamp,
4511 author_email: proto.author_email.clone().into(),
4512 author_name: proto.author_name.clone().into(),
4513 }
4514}
4515
4516async fn compute_snapshot(
4517 id: RepositoryId,
4518 work_directory_abs_path: Arc<Path>,
4519 prev_snapshot: RepositorySnapshot,
4520 backend: Arc<dyn GitRepository>,
4521) -> Result<(RepositorySnapshot, Vec<RepositoryEvent>)> {
4522 let mut events = Vec::new();
4523 let branches = backend.branches().await?;
4524 let branch = branches.into_iter().find(|branch| branch.is_head);
4525 let statuses = backend.status(&[WORK_DIRECTORY_REPO_PATH.clone()]).await?;
4526 let statuses_by_path = SumTree::from_iter(
4527 statuses
4528 .entries
4529 .iter()
4530 .map(|(repo_path, status)| StatusEntry {
4531 repo_path: repo_path.clone(),
4532 status: *status,
4533 }),
4534 &(),
4535 );
4536 let (merge_details, merge_heads_changed) =
4537 MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?;
4538
4539 if merge_heads_changed
4540 || branch != prev_snapshot.branch
4541 || statuses_by_path != prev_snapshot.statuses_by_path
4542 {
4543 events.push(RepositoryEvent::Updated { full_scan: true });
4544 }
4545
4546 // Cache merge conflict paths so they don't change from staging/unstaging,
4547 // until the merge heads change (at commit time, etc.).
4548 if merge_heads_changed {
4549 events.push(RepositoryEvent::MergeHeadsChanged);
4550 }
4551
4552 // Useful when branch is None in detached head state
4553 let head_commit = match backend.head_sha().await {
4554 Some(head_sha) => backend.show(head_sha).await.log_err(),
4555 None => None,
4556 };
4557
4558 let snapshot = RepositorySnapshot {
4559 id,
4560 statuses_by_path,
4561 work_directory_abs_path,
4562 scan_id: prev_snapshot.scan_id + 1,
4563 branch,
4564 head_commit,
4565 merge: merge_details,
4566 };
4567
4568 Ok((snapshot, events))
4569}
4570
4571fn status_from_proto(
4572 simple_status: i32,
4573 status: Option<proto::GitFileStatus>,
4574) -> anyhow::Result<FileStatus> {
4575 use proto::git_file_status::Variant;
4576
4577 let Some(variant) = status.and_then(|status| status.variant) else {
4578 let code = proto::GitStatus::from_i32(simple_status)
4579 .ok_or_else(|| anyhow!("Invalid git status code: {simple_status}"))?;
4580 let result = match code {
4581 proto::GitStatus::Added => TrackedStatus {
4582 worktree_status: StatusCode::Added,
4583 index_status: StatusCode::Unmodified,
4584 }
4585 .into(),
4586 proto::GitStatus::Modified => TrackedStatus {
4587 worktree_status: StatusCode::Modified,
4588 index_status: StatusCode::Unmodified,
4589 }
4590 .into(),
4591 proto::GitStatus::Conflict => UnmergedStatus {
4592 first_head: UnmergedStatusCode::Updated,
4593 second_head: UnmergedStatusCode::Updated,
4594 }
4595 .into(),
4596 proto::GitStatus::Deleted => TrackedStatus {
4597 worktree_status: StatusCode::Deleted,
4598 index_status: StatusCode::Unmodified,
4599 }
4600 .into(),
4601 _ => return Err(anyhow!("Invalid code for simple status: {simple_status}")),
4602 };
4603 return Ok(result);
4604 };
4605
4606 let result = match variant {
4607 Variant::Untracked(_) => FileStatus::Untracked,
4608 Variant::Ignored(_) => FileStatus::Ignored,
4609 Variant::Unmerged(unmerged) => {
4610 let [first_head, second_head] =
4611 [unmerged.first_head, unmerged.second_head].map(|head| {
4612 let code = proto::GitStatus::from_i32(head)
4613 .ok_or_else(|| anyhow!("Invalid git status code: {head}"))?;
4614 let result = match code {
4615 proto::GitStatus::Added => UnmergedStatusCode::Added,
4616 proto::GitStatus::Updated => UnmergedStatusCode::Updated,
4617 proto::GitStatus::Deleted => UnmergedStatusCode::Deleted,
4618 _ => return Err(anyhow!("Invalid code for unmerged status: {code:?}")),
4619 };
4620 Ok(result)
4621 });
4622 let [first_head, second_head] = [first_head?, second_head?];
4623 UnmergedStatus {
4624 first_head,
4625 second_head,
4626 }
4627 .into()
4628 }
4629 Variant::Tracked(tracked) => {
4630 let [index_status, worktree_status] = [tracked.index_status, tracked.worktree_status]
4631 .map(|status| {
4632 let code = proto::GitStatus::from_i32(status)
4633 .ok_or_else(|| anyhow!("Invalid git status code: {status}"))?;
4634 let result = match code {
4635 proto::GitStatus::Modified => StatusCode::Modified,
4636 proto::GitStatus::TypeChanged => StatusCode::TypeChanged,
4637 proto::GitStatus::Added => StatusCode::Added,
4638 proto::GitStatus::Deleted => StatusCode::Deleted,
4639 proto::GitStatus::Renamed => StatusCode::Renamed,
4640 proto::GitStatus::Copied => StatusCode::Copied,
4641 proto::GitStatus::Unmodified => StatusCode::Unmodified,
4642 _ => return Err(anyhow!("Invalid code for tracked status: {code:?}")),
4643 };
4644 Ok(result)
4645 });
4646 let [index_status, worktree_status] = [index_status?, worktree_status?];
4647 TrackedStatus {
4648 index_status,
4649 worktree_status,
4650 }
4651 .into()
4652 }
4653 };
4654 Ok(result)
4655}
4656
4657fn status_to_proto(status: FileStatus) -> proto::GitFileStatus {
4658 use proto::git_file_status::{Tracked, Unmerged, Variant};
4659
4660 let variant = match status {
4661 FileStatus::Untracked => Variant::Untracked(Default::default()),
4662 FileStatus::Ignored => Variant::Ignored(Default::default()),
4663 FileStatus::Unmerged(UnmergedStatus {
4664 first_head,
4665 second_head,
4666 }) => Variant::Unmerged(Unmerged {
4667 first_head: unmerged_status_to_proto(first_head),
4668 second_head: unmerged_status_to_proto(second_head),
4669 }),
4670 FileStatus::Tracked(TrackedStatus {
4671 index_status,
4672 worktree_status,
4673 }) => Variant::Tracked(Tracked {
4674 index_status: tracked_status_to_proto(index_status),
4675 worktree_status: tracked_status_to_proto(worktree_status),
4676 }),
4677 };
4678 proto::GitFileStatus {
4679 variant: Some(variant),
4680 }
4681}
4682
4683fn unmerged_status_to_proto(code: UnmergedStatusCode) -> i32 {
4684 match code {
4685 UnmergedStatusCode::Added => proto::GitStatus::Added as _,
4686 UnmergedStatusCode::Deleted => proto::GitStatus::Deleted as _,
4687 UnmergedStatusCode::Updated => proto::GitStatus::Updated as _,
4688 }
4689}
4690
4691fn tracked_status_to_proto(code: StatusCode) -> i32 {
4692 match code {
4693 StatusCode::Added => proto::GitStatus::Added as _,
4694 StatusCode::Deleted => proto::GitStatus::Deleted as _,
4695 StatusCode::Modified => proto::GitStatus::Modified as _,
4696 StatusCode::Renamed => proto::GitStatus::Renamed as _,
4697 StatusCode::TypeChanged => proto::GitStatus::TypeChanged as _,
4698 StatusCode::Copied => proto::GitStatus::Copied as _,
4699 StatusCode::Unmodified => proto::GitStatus::Unmodified as _,
4700 }
4701}