1mod conflict_set;
2pub mod git_traversal;
3
4use crate::{
5 ProjectEnvironment, ProjectItem, ProjectPath,
6 buffer_store::{BufferStore, BufferStoreEvent},
7 worktree_store::{WorktreeStore, WorktreeStoreEvent},
8};
9use anyhow::{Context as _, Result, anyhow, bail};
10use askpass::AskPassDelegate;
11use buffer_diff::{BufferDiff, BufferDiffEvent};
12use client::ProjectId;
13use collections::HashMap;
14pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, ConflictSetUpdate};
15use fs::Fs;
16use futures::{
17 FutureExt, StreamExt as _,
18 channel::{mpsc, oneshot},
19 future::{self, Shared},
20};
21use git::{
22 BuildPermalinkParams, GitHostingProviderRegistry, WORK_DIRECTORY_REPO_PATH,
23 blame::Blame,
24 parse_git_remote_url,
25 repository::{
26 Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, DiffType, GitRepository,
27 GitRepositoryCheckpoint, PushOptions, Remote, RemoteCommandOutput, RepoPath, ResetMode,
28 UpstreamTrackingStatus,
29 },
30 status::{
31 FileStatus, GitSummary, StatusCode, TrackedStatus, UnmergedStatus, UnmergedStatusCode,
32 },
33};
34use gpui::{
35 App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
36 WeakEntity,
37};
38use language::{
39 Buffer, BufferEvent, Language, LanguageRegistry,
40 proto::{deserialize_version, serialize_version},
41};
42use parking_lot::Mutex;
43use postage::stream::Stream as _;
44use rpc::{
45 AnyProtoClient, TypedEnvelope,
46 proto::{self, FromProto, SSH_PROJECT_ID, ToProto, git_reset, split_repository_update},
47};
48use serde::Deserialize;
49use std::{
50 cmp::Ordering,
51 collections::{BTreeSet, VecDeque},
52 future::Future,
53 mem,
54 ops::Range,
55 path::{Path, PathBuf},
56 sync::{
57 Arc,
58 atomic::{self, AtomicU64},
59 },
60 time::Instant,
61};
62use sum_tree::{Edit, SumTree, TreeSet};
63use text::{Bias, BufferId};
64use util::{ResultExt, debug_panic, post_inc};
65use worktree::{
66 File, PathKey, PathProgress, PathSummary, PathTarget, UpdatedGitRepositoriesSet,
67 UpdatedGitRepository, Worktree,
68};
69
70pub struct GitStore {
71 state: GitStoreState,
72 buffer_store: Entity<BufferStore>,
73 worktree_store: Entity<WorktreeStore>,
74 repositories: HashMap<RepositoryId, Entity<Repository>>,
75 active_repo_id: Option<RepositoryId>,
76 #[allow(clippy::type_complexity)]
77 loading_diffs:
78 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
79 diffs: HashMap<BufferId, Entity<BufferGitState>>,
80 shared_diffs: HashMap<proto::PeerId, HashMap<BufferId, SharedDiffs>>,
81 _subscriptions: Vec<Subscription>,
82}
83
84#[derive(Default)]
85struct SharedDiffs {
86 unstaged: Option<Entity<BufferDiff>>,
87 uncommitted: Option<Entity<BufferDiff>>,
88}
89
90struct BufferGitState {
91 unstaged_diff: Option<WeakEntity<BufferDiff>>,
92 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
93 conflict_set: Option<WeakEntity<ConflictSet>>,
94 recalculate_diff_task: Option<Task<Result<()>>>,
95 reparse_conflict_markers_task: Option<Task<Result<()>>>,
96 language: Option<Arc<Language>>,
97 language_registry: Option<Arc<LanguageRegistry>>,
98 conflict_updated_futures: Vec<oneshot::Sender<()>>,
99 recalculating_tx: postage::watch::Sender<bool>,
100
101 /// These operation counts are used to ensure that head and index text
102 /// values read from the git repository are up-to-date with any hunk staging
103 /// operations that have been performed on the BufferDiff.
104 ///
105 /// The operation count is incremented immediately when the user initiates a
106 /// hunk stage/unstage operation. Then, upon finishing writing the new index
107 /// text do disk, the `operation count as of write` is updated to reflect
108 /// the operation count that prompted the write.
109 hunk_staging_operation_count: usize,
110 hunk_staging_operation_count_as_of_write: usize,
111
112 head_text: Option<Arc<String>>,
113 index_text: Option<Arc<String>>,
114 head_changed: bool,
115 index_changed: bool,
116 language_changed: bool,
117}
118
119#[derive(Clone, Debug)]
120enum DiffBasesChange {
121 SetIndex(Option<String>),
122 SetHead(Option<String>),
123 SetEach {
124 index: Option<String>,
125 head: Option<String>,
126 },
127 SetBoth(Option<String>),
128}
129
130#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
131enum DiffKind {
132 Unstaged,
133 Uncommitted,
134}
135
136enum GitStoreState {
137 Local {
138 next_repository_id: Arc<AtomicU64>,
139 downstream: Option<LocalDownstreamState>,
140 project_environment: Entity<ProjectEnvironment>,
141 fs: Arc<dyn Fs>,
142 },
143 Ssh {
144 upstream_client: AnyProtoClient,
145 upstream_project_id: ProjectId,
146 downstream: Option<(AnyProtoClient, ProjectId)>,
147 },
148 Remote {
149 upstream_client: AnyProtoClient,
150 upstream_project_id: ProjectId,
151 },
152}
153
154enum DownstreamUpdate {
155 UpdateRepository(RepositorySnapshot),
156 RemoveRepository(RepositoryId),
157}
158
159struct LocalDownstreamState {
160 client: AnyProtoClient,
161 project_id: ProjectId,
162 updates_tx: mpsc::UnboundedSender<DownstreamUpdate>,
163 _task: Task<Result<()>>,
164}
165
166#[derive(Clone, Debug)]
167pub struct GitStoreCheckpoint {
168 checkpoints_by_work_dir_abs_path: HashMap<Arc<Path>, GitRepositoryCheckpoint>,
169}
170
171#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct StatusEntry {
173 pub repo_path: RepoPath,
174 pub status: FileStatus,
175}
176
177impl StatusEntry {
178 fn to_proto(&self) -> proto::StatusEntry {
179 let simple_status = match self.status {
180 FileStatus::Ignored | FileStatus::Untracked => proto::GitStatus::Added as i32,
181 FileStatus::Unmerged { .. } => proto::GitStatus::Conflict as i32,
182 FileStatus::Tracked(TrackedStatus {
183 index_status,
184 worktree_status,
185 }) => tracked_status_to_proto(if worktree_status != StatusCode::Unmodified {
186 worktree_status
187 } else {
188 index_status
189 }),
190 };
191
192 proto::StatusEntry {
193 repo_path: self.repo_path.as_ref().to_proto(),
194 simple_status,
195 status: Some(status_to_proto(self.status)),
196 }
197 }
198}
199
200impl TryFrom<proto::StatusEntry> for StatusEntry {
201 type Error = anyhow::Error;
202
203 fn try_from(value: proto::StatusEntry) -> Result<Self, Self::Error> {
204 let repo_path = RepoPath(Arc::<Path>::from_proto(value.repo_path));
205 let status = status_from_proto(value.simple_status, value.status)?;
206 Ok(Self { repo_path, status })
207 }
208}
209
210impl sum_tree::Item for StatusEntry {
211 type Summary = PathSummary<GitSummary>;
212
213 fn summary(&self, _: &<Self::Summary as sum_tree::Summary>::Context) -> Self::Summary {
214 PathSummary {
215 max_path: self.repo_path.0.clone(),
216 item_summary: self.status.summary(),
217 }
218 }
219}
220
221impl sum_tree::KeyedItem for StatusEntry {
222 type Key = PathKey;
223
224 fn key(&self) -> Self::Key {
225 PathKey(self.repo_path.0.clone())
226 }
227}
228
229#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
230pub struct RepositoryId(pub u64);
231
232#[derive(Clone, Debug, Default, PartialEq, Eq)]
233pub struct MergeDetails {
234 pub conflicted_paths: TreeSet<RepoPath>,
235 pub message: Option<SharedString>,
236 pub heads: Vec<Option<SharedString>>,
237}
238
239#[derive(Clone, Debug, PartialEq, Eq)]
240pub struct RepositorySnapshot {
241 pub id: RepositoryId,
242 pub statuses_by_path: SumTree<StatusEntry>,
243 pub work_directory_abs_path: Arc<Path>,
244 pub branch: Option<Branch>,
245 pub head_commit: Option<CommitDetails>,
246 pub scan_id: u64,
247 pub merge: MergeDetails,
248}
249
250type JobId = u64;
251
252#[derive(Clone, Debug, PartialEq, Eq)]
253pub struct JobInfo {
254 pub start: Instant,
255 pub message: SharedString,
256}
257
258pub struct Repository {
259 this: WeakEntity<Self>,
260 snapshot: RepositorySnapshot,
261 commit_message_buffer: Option<Entity<Buffer>>,
262 git_store: WeakEntity<GitStore>,
263 // For a local repository, holds paths that have had worktree events since the last status scan completed,
264 // and that should be examined during the next status scan.
265 paths_needing_status_update: BTreeSet<RepoPath>,
266 job_sender: mpsc::UnboundedSender<GitJob>,
267 active_jobs: HashMap<JobId, JobInfo>,
268 job_id: JobId,
269 askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
270 latest_askpass_id: u64,
271}
272
273impl std::ops::Deref for Repository {
274 type Target = RepositorySnapshot;
275
276 fn deref(&self) -> &Self::Target {
277 &self.snapshot
278 }
279}
280
281#[derive(Clone)]
282pub enum RepositoryState {
283 Local {
284 backend: Arc<dyn GitRepository>,
285 environment: Arc<HashMap<String, String>>,
286 },
287 Remote {
288 project_id: ProjectId,
289 client: AnyProtoClient,
290 },
291}
292
293#[derive(Clone, Debug)]
294pub enum RepositoryEvent {
295 Updated { full_scan: bool },
296 MergeHeadsChanged,
297}
298
299#[derive(Clone, Debug)]
300pub struct JobsUpdated;
301
302#[derive(Debug)]
303pub enum GitStoreEvent {
304 ActiveRepositoryChanged(Option<RepositoryId>),
305 RepositoryUpdated(RepositoryId, RepositoryEvent, bool),
306 RepositoryAdded(RepositoryId),
307 RepositoryRemoved(RepositoryId),
308 IndexWriteError(anyhow::Error),
309 JobsUpdated,
310 ConflictsUpdated,
311}
312
313impl EventEmitter<RepositoryEvent> for Repository {}
314impl EventEmitter<JobsUpdated> for Repository {}
315impl EventEmitter<GitStoreEvent> for GitStore {}
316
317pub struct GitJob {
318 job: Box<dyn FnOnce(RepositoryState, &mut AsyncApp) -> Task<()>>,
319 key: Option<GitJobKey>,
320}
321
322#[derive(PartialEq, Eq)]
323enum GitJobKey {
324 WriteIndex(RepoPath),
325 ReloadBufferDiffBases,
326 RefreshStatuses,
327 ReloadGitState,
328}
329
330impl GitStore {
331 pub fn local(
332 worktree_store: &Entity<WorktreeStore>,
333 buffer_store: Entity<BufferStore>,
334 environment: Entity<ProjectEnvironment>,
335 fs: Arc<dyn Fs>,
336 cx: &mut Context<Self>,
337 ) -> Self {
338 Self::new(
339 worktree_store.clone(),
340 buffer_store,
341 GitStoreState::Local {
342 next_repository_id: Arc::new(AtomicU64::new(1)),
343 downstream: None,
344 project_environment: environment,
345 fs,
346 },
347 cx,
348 )
349 }
350
351 pub fn remote(
352 worktree_store: &Entity<WorktreeStore>,
353 buffer_store: Entity<BufferStore>,
354 upstream_client: AnyProtoClient,
355 project_id: ProjectId,
356 cx: &mut Context<Self>,
357 ) -> Self {
358 Self::new(
359 worktree_store.clone(),
360 buffer_store,
361 GitStoreState::Remote {
362 upstream_client,
363 upstream_project_id: project_id,
364 },
365 cx,
366 )
367 }
368
369 pub fn ssh(
370 worktree_store: &Entity<WorktreeStore>,
371 buffer_store: Entity<BufferStore>,
372 upstream_client: AnyProtoClient,
373 cx: &mut Context<Self>,
374 ) -> Self {
375 Self::new(
376 worktree_store.clone(),
377 buffer_store,
378 GitStoreState::Ssh {
379 upstream_client,
380 upstream_project_id: ProjectId(SSH_PROJECT_ID),
381 downstream: None,
382 },
383 cx,
384 )
385 }
386
387 fn new(
388 worktree_store: Entity<WorktreeStore>,
389 buffer_store: Entity<BufferStore>,
390 state: GitStoreState,
391 cx: &mut Context<Self>,
392 ) -> Self {
393 let _subscriptions = vec![
394 cx.subscribe(&worktree_store, Self::on_worktree_store_event),
395 cx.subscribe(&buffer_store, Self::on_buffer_store_event),
396 ];
397
398 GitStore {
399 state,
400 buffer_store,
401 worktree_store,
402 repositories: HashMap::default(),
403 active_repo_id: None,
404 _subscriptions,
405 loading_diffs: HashMap::default(),
406 shared_diffs: HashMap::default(),
407 diffs: HashMap::default(),
408 }
409 }
410
411 pub fn init(client: &AnyProtoClient) {
412 client.add_entity_request_handler(Self::handle_get_remotes);
413 client.add_entity_request_handler(Self::handle_get_branches);
414 client.add_entity_request_handler(Self::handle_change_branch);
415 client.add_entity_request_handler(Self::handle_create_branch);
416 client.add_entity_request_handler(Self::handle_git_init);
417 client.add_entity_request_handler(Self::handle_push);
418 client.add_entity_request_handler(Self::handle_pull);
419 client.add_entity_request_handler(Self::handle_fetch);
420 client.add_entity_request_handler(Self::handle_stage);
421 client.add_entity_request_handler(Self::handle_unstage);
422 client.add_entity_request_handler(Self::handle_commit);
423 client.add_entity_request_handler(Self::handle_reset);
424 client.add_entity_request_handler(Self::handle_show);
425 client.add_entity_request_handler(Self::handle_load_commit_diff);
426 client.add_entity_request_handler(Self::handle_checkout_files);
427 client.add_entity_request_handler(Self::handle_open_commit_message_buffer);
428 client.add_entity_request_handler(Self::handle_set_index_text);
429 client.add_entity_request_handler(Self::handle_askpass);
430 client.add_entity_request_handler(Self::handle_check_for_pushed_commits);
431 client.add_entity_request_handler(Self::handle_git_diff);
432 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
433 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
434 client.add_entity_message_handler(Self::handle_update_diff_bases);
435 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
436 client.add_entity_request_handler(Self::handle_blame_buffer);
437 client.add_entity_message_handler(Self::handle_update_repository);
438 client.add_entity_message_handler(Self::handle_remove_repository);
439 }
440
441 pub fn is_local(&self) -> bool {
442 matches!(self.state, GitStoreState::Local { .. })
443 }
444
445 pub fn shared(&mut self, project_id: u64, client: AnyProtoClient, cx: &mut Context<Self>) {
446 match &mut self.state {
447 GitStoreState::Ssh {
448 downstream: downstream_client,
449 ..
450 } => {
451 for repo in self.repositories.values() {
452 let update = repo.read(cx).snapshot.initial_update(project_id);
453 for update in split_repository_update(update) {
454 client.send(update).log_err();
455 }
456 }
457 *downstream_client = Some((client, ProjectId(project_id)));
458 }
459 GitStoreState::Local {
460 downstream: downstream_client,
461 ..
462 } => {
463 let mut snapshots = HashMap::default();
464 let (updates_tx, mut updates_rx) = mpsc::unbounded();
465 for repo in self.repositories.values() {
466 updates_tx
467 .unbounded_send(DownstreamUpdate::UpdateRepository(
468 repo.read(cx).snapshot.clone(),
469 ))
470 .ok();
471 }
472 *downstream_client = Some(LocalDownstreamState {
473 client: client.clone(),
474 project_id: ProjectId(project_id),
475 updates_tx,
476 _task: cx.spawn(async move |this, cx| {
477 cx.background_spawn(async move {
478 while let Some(update) = updates_rx.next().await {
479 match update {
480 DownstreamUpdate::UpdateRepository(snapshot) => {
481 if let Some(old_snapshot) = snapshots.get_mut(&snapshot.id)
482 {
483 let update =
484 snapshot.build_update(old_snapshot, project_id);
485 *old_snapshot = snapshot;
486 for update in split_repository_update(update) {
487 client.send(update)?;
488 }
489 } else {
490 let update = snapshot.initial_update(project_id);
491 for update in split_repository_update(update) {
492 client.send(update)?;
493 }
494 snapshots.insert(snapshot.id, snapshot);
495 }
496 }
497 DownstreamUpdate::RemoveRepository(id) => {
498 client.send(proto::RemoveRepository {
499 project_id,
500 id: id.to_proto(),
501 })?;
502 }
503 }
504 }
505 anyhow::Ok(())
506 })
507 .await
508 .ok();
509 this.update(cx, |this, _| {
510 if let GitStoreState::Local {
511 downstream: downstream_client,
512 ..
513 } = &mut this.state
514 {
515 downstream_client.take();
516 } else {
517 unreachable!("unshared called on remote store");
518 }
519 })
520 }),
521 });
522 }
523 GitStoreState::Remote { .. } => {
524 debug_panic!("shared called on remote store");
525 }
526 }
527 }
528
529 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
530 match &mut self.state {
531 GitStoreState::Local {
532 downstream: downstream_client,
533 ..
534 } => {
535 downstream_client.take();
536 }
537 GitStoreState::Ssh {
538 downstream: downstream_client,
539 ..
540 } => {
541 downstream_client.take();
542 }
543 GitStoreState::Remote { .. } => {
544 debug_panic!("unshared called on remote store");
545 }
546 }
547 self.shared_diffs.clear();
548 }
549
550 pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) {
551 self.shared_diffs.remove(peer_id);
552 }
553
554 pub fn active_repository(&self) -> Option<Entity<Repository>> {
555 self.active_repo_id
556 .as_ref()
557 .map(|id| self.repositories[&id].clone())
558 }
559
560 pub fn open_unstaged_diff(
561 &mut self,
562 buffer: Entity<Buffer>,
563 cx: &mut Context<Self>,
564 ) -> Task<Result<Entity<BufferDiff>>> {
565 let buffer_id = buffer.read(cx).remote_id();
566 if let Some(diff_state) = self.diffs.get(&buffer_id) {
567 if let Some(unstaged_diff) = diff_state
568 .read(cx)
569 .unstaged_diff
570 .as_ref()
571 .and_then(|weak| weak.upgrade())
572 {
573 if let Some(task) =
574 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
575 {
576 return cx.background_executor().spawn(async move {
577 task.await;
578 Ok(unstaged_diff)
579 });
580 }
581 return Task::ready(Ok(unstaged_diff));
582 }
583 }
584
585 let Some((repo, repo_path)) =
586 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
587 else {
588 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
589 };
590
591 let task = self
592 .loading_diffs
593 .entry((buffer_id, DiffKind::Unstaged))
594 .or_insert_with(|| {
595 let staged_text = repo.update(cx, |repo, cx| {
596 repo.load_staged_text(buffer_id, repo_path, cx)
597 });
598 cx.spawn(async move |this, cx| {
599 Self::open_diff_internal(
600 this,
601 DiffKind::Unstaged,
602 staged_text.await.map(DiffBasesChange::SetIndex),
603 buffer,
604 cx,
605 )
606 .await
607 .map_err(Arc::new)
608 })
609 .shared()
610 })
611 .clone();
612
613 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
614 }
615
616 pub fn open_uncommitted_diff(
617 &mut self,
618 buffer: Entity<Buffer>,
619 cx: &mut Context<Self>,
620 ) -> Task<Result<Entity<BufferDiff>>> {
621 let buffer_id = buffer.read(cx).remote_id();
622
623 if let Some(diff_state) = self.diffs.get(&buffer_id) {
624 if let Some(uncommitted_diff) = diff_state
625 .read(cx)
626 .uncommitted_diff
627 .as_ref()
628 .and_then(|weak| weak.upgrade())
629 {
630 if let Some(task) =
631 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
632 {
633 return cx.background_executor().spawn(async move {
634 task.await;
635 Ok(uncommitted_diff)
636 });
637 }
638 return Task::ready(Ok(uncommitted_diff));
639 }
640 }
641
642 let Some((repo, repo_path)) =
643 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
644 else {
645 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
646 };
647
648 let task = self
649 .loading_diffs
650 .entry((buffer_id, DiffKind::Uncommitted))
651 .or_insert_with(|| {
652 let changes = repo.update(cx, |repo, cx| {
653 repo.load_committed_text(buffer_id, repo_path, cx)
654 });
655
656 cx.spawn(async move |this, cx| {
657 Self::open_diff_internal(this, DiffKind::Uncommitted, changes.await, buffer, cx)
658 .await
659 .map_err(Arc::new)
660 })
661 .shared()
662 })
663 .clone();
664
665 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
666 }
667
668 async fn open_diff_internal(
669 this: WeakEntity<Self>,
670 kind: DiffKind,
671 texts: Result<DiffBasesChange>,
672 buffer_entity: Entity<Buffer>,
673 cx: &mut AsyncApp,
674 ) -> Result<Entity<BufferDiff>> {
675 let diff_bases_change = match texts {
676 Err(e) => {
677 this.update(cx, |this, cx| {
678 let buffer = buffer_entity.read(cx);
679 let buffer_id = buffer.remote_id();
680 this.loading_diffs.remove(&(buffer_id, kind));
681 })?;
682 return Err(e);
683 }
684 Ok(change) => change,
685 };
686
687 this.update(cx, |this, cx| {
688 let buffer = buffer_entity.read(cx);
689 let buffer_id = buffer.remote_id();
690 let language = buffer.language().cloned();
691 let language_registry = buffer.language_registry();
692 let text_snapshot = buffer.text_snapshot();
693 this.loading_diffs.remove(&(buffer_id, kind));
694
695 let git_store = cx.weak_entity();
696 let diff_state = this
697 .diffs
698 .entry(buffer_id)
699 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
700
701 let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
702
703 cx.subscribe(&diff, Self::on_buffer_diff_event).detach();
704 diff_state.update(cx, |diff_state, cx| {
705 diff_state.language = language;
706 diff_state.language_registry = language_registry;
707
708 match kind {
709 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
710 DiffKind::Uncommitted => {
711 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
712 diff
713 } else {
714 let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
715 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
716 unstaged_diff
717 };
718
719 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
720 diff_state.uncommitted_diff = Some(diff.downgrade())
721 }
722 }
723
724 diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
725 let rx = diff_state.wait_for_recalculation();
726
727 anyhow::Ok(async move {
728 if let Some(rx) = rx {
729 rx.await;
730 }
731 Ok(diff)
732 })
733 })
734 })??
735 .await
736 }
737
738 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
739 let diff_state = self.diffs.get(&buffer_id)?;
740 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
741 }
742
743 pub fn get_uncommitted_diff(
744 &self,
745 buffer_id: BufferId,
746 cx: &App,
747 ) -> Option<Entity<BufferDiff>> {
748 let diff_state = self.diffs.get(&buffer_id)?;
749 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
750 }
751
752 pub fn open_conflict_set(
753 &mut self,
754 buffer: Entity<Buffer>,
755 cx: &mut Context<Self>,
756 ) -> Entity<ConflictSet> {
757 log::debug!("open conflict set");
758 let buffer_id = buffer.read(cx).remote_id();
759
760 if let Some(git_state) = self.diffs.get(&buffer_id) {
761 if let Some(conflict_set) = git_state
762 .read(cx)
763 .conflict_set
764 .as_ref()
765 .and_then(|weak| weak.upgrade())
766 {
767 let conflict_set = conflict_set.clone();
768 let buffer_snapshot = buffer.read(cx).text_snapshot();
769
770 git_state.update(cx, |state, cx| {
771 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
772 });
773
774 return conflict_set;
775 }
776 }
777
778 let is_unmerged = self
779 .repository_and_path_for_buffer_id(buffer_id, cx)
780 .map_or(false, |(repo, path)| {
781 repo.read(cx).snapshot.has_conflict(&path)
782 });
783 let git_store = cx.weak_entity();
784 let buffer_git_state = self
785 .diffs
786 .entry(buffer_id)
787 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
788 let conflict_set = cx.new(|cx| ConflictSet::new(buffer_id, is_unmerged, cx));
789
790 self._subscriptions
791 .push(cx.subscribe(&conflict_set, |_, _, _, cx| {
792 cx.emit(GitStoreEvent::ConflictsUpdated);
793 }));
794
795 buffer_git_state.update(cx, |state, cx| {
796 state.conflict_set = Some(conflict_set.downgrade());
797 let buffer_snapshot = buffer.read(cx).text_snapshot();
798 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
799 });
800
801 conflict_set
802 }
803
804 pub fn project_path_git_status(
805 &self,
806 project_path: &ProjectPath,
807 cx: &App,
808 ) -> Option<FileStatus> {
809 let (repo, repo_path) = self.repository_and_path_for_project_path(project_path, cx)?;
810 Some(repo.read(cx).status_for_path(&repo_path)?.status)
811 }
812
813 pub fn checkpoint(&self, cx: &mut App) -> Task<Result<GitStoreCheckpoint>> {
814 let mut work_directory_abs_paths = Vec::new();
815 let mut checkpoints = Vec::new();
816 for repository in self.repositories.values() {
817 repository.update(cx, |repository, _| {
818 work_directory_abs_paths.push(repository.snapshot.work_directory_abs_path.clone());
819 checkpoints.push(repository.checkpoint().map(|checkpoint| checkpoint?));
820 });
821 }
822
823 cx.background_executor().spawn(async move {
824 let checkpoints = future::try_join_all(checkpoints).await?;
825 Ok(GitStoreCheckpoint {
826 checkpoints_by_work_dir_abs_path: work_directory_abs_paths
827 .into_iter()
828 .zip(checkpoints)
829 .collect(),
830 })
831 })
832 }
833
834 pub fn restore_checkpoint(
835 &self,
836 checkpoint: GitStoreCheckpoint,
837 cx: &mut App,
838 ) -> Task<Result<()>> {
839 let repositories_by_work_dir_abs_path = self
840 .repositories
841 .values()
842 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
843 .collect::<HashMap<_, _>>();
844
845 let mut tasks = Vec::new();
846 for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
847 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path) {
848 let restore = repository.update(cx, |repository, _| {
849 repository.restore_checkpoint(checkpoint)
850 });
851 tasks.push(async move { restore.await? });
852 }
853 }
854 cx.background_spawn(async move {
855 future::try_join_all(tasks).await?;
856 Ok(())
857 })
858 }
859
860 /// Compares two checkpoints, returning true if they are equal.
861 pub fn compare_checkpoints(
862 &self,
863 left: GitStoreCheckpoint,
864 mut right: GitStoreCheckpoint,
865 cx: &mut App,
866 ) -> Task<Result<bool>> {
867 let repositories_by_work_dir_abs_path = self
868 .repositories
869 .values()
870 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
871 .collect::<HashMap<_, _>>();
872
873 let mut tasks = Vec::new();
874 for (work_dir_abs_path, left_checkpoint) in left.checkpoints_by_work_dir_abs_path {
875 if let Some(right_checkpoint) = right
876 .checkpoints_by_work_dir_abs_path
877 .remove(&work_dir_abs_path)
878 {
879 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path)
880 {
881 let compare = repository.update(cx, |repository, _| {
882 repository.compare_checkpoints(left_checkpoint, right_checkpoint)
883 });
884
885 tasks.push(async move { compare.await? });
886 }
887 } else {
888 return Task::ready(Ok(false));
889 }
890 }
891 cx.background_spawn(async move {
892 Ok(future::try_join_all(tasks)
893 .await?
894 .into_iter()
895 .all(|result| result))
896 })
897 }
898
899 /// Blames a buffer.
900 pub fn blame_buffer(
901 &self,
902 buffer: &Entity<Buffer>,
903 version: Option<clock::Global>,
904 cx: &mut App,
905 ) -> Task<Result<Option<Blame>>> {
906 let buffer = buffer.read(cx);
907 let Some((repo, repo_path)) =
908 self.repository_and_path_for_buffer_id(buffer.remote_id(), cx)
909 else {
910 return Task::ready(Err(anyhow!("failed to find a git repository for buffer")));
911 };
912 let content = match &version {
913 Some(version) => buffer.rope_for_version(version).clone(),
914 None => buffer.as_rope().clone(),
915 };
916 let version = version.unwrap_or(buffer.version());
917 let buffer_id = buffer.remote_id();
918
919 let rx = repo.update(cx, |repo, _| {
920 repo.send_job(None, move |state, _| async move {
921 match state {
922 RepositoryState::Local { backend, .. } => backend
923 .blame(repo_path.clone(), content)
924 .await
925 .with_context(|| format!("Failed to blame {:?}", repo_path.0))
926 .map(Some),
927 RepositoryState::Remote { project_id, client } => {
928 let response = client
929 .request(proto::BlameBuffer {
930 project_id: project_id.to_proto(),
931 buffer_id: buffer_id.into(),
932 version: serialize_version(&version),
933 })
934 .await?;
935 Ok(deserialize_blame_buffer_response(response))
936 }
937 }
938 })
939 });
940
941 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
942 }
943
944 pub fn get_permalink_to_line(
945 &self,
946 buffer: &Entity<Buffer>,
947 selection: Range<u32>,
948 cx: &mut App,
949 ) -> Task<Result<url::Url>> {
950 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
951 return Task::ready(Err(anyhow!("buffer has no file")));
952 };
953
954 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
955 &(file.worktree.read(cx).id(), file.path.clone()).into(),
956 cx,
957 ) else {
958 // If we're not in a Git repo, check whether this is a Rust source
959 // file in the Cargo registry (presumably opened with go-to-definition
960 // from a normal Rust file). If so, we can put together a permalink
961 // using crate metadata.
962 if buffer
963 .read(cx)
964 .language()
965 .is_none_or(|lang| lang.name() != "Rust".into())
966 {
967 return Task::ready(Err(anyhow!("no permalink available")));
968 }
969 let Some(file_path) = file.worktree.read(cx).absolutize(&file.path).ok() else {
970 return Task::ready(Err(anyhow!("no permalink available")));
971 };
972 return cx.spawn(async move |cx| {
973 let provider_registry = cx.update(GitHostingProviderRegistry::default_global)?;
974 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
975 .context("no permalink available")
976 });
977
978 // TODO remote case
979 };
980
981 let buffer_id = buffer.read(cx).remote_id();
982 let branch = repo.read(cx).branch.clone();
983 let remote = branch
984 .as_ref()
985 .and_then(|b| b.upstream.as_ref())
986 .and_then(|b| b.remote_name())
987 .unwrap_or("origin")
988 .to_string();
989
990 let rx = repo.update(cx, |repo, _| {
991 repo.send_job(None, move |state, cx| async move {
992 match state {
993 RepositoryState::Local { backend, .. } => {
994 let origin_url = backend
995 .remote_url(&remote)
996 .with_context(|| format!("remote \"{remote}\" not found"))?;
997
998 let sha = backend.head_sha().await.context("reading HEAD SHA")?;
999
1000 let provider_registry =
1001 cx.update(GitHostingProviderRegistry::default_global)?;
1002
1003 let (provider, remote) =
1004 parse_git_remote_url(provider_registry, &origin_url)
1005 .context("parsing Git remote URL")?;
1006
1007 let path = repo_path.to_str().with_context(|| {
1008 format!("converting repo path {repo_path:?} to string")
1009 })?;
1010
1011 Ok(provider.build_permalink(
1012 remote,
1013 BuildPermalinkParams {
1014 sha: &sha,
1015 path,
1016 selection: Some(selection),
1017 },
1018 ))
1019 }
1020 RepositoryState::Remote { project_id, client } => {
1021 let response = client
1022 .request(proto::GetPermalinkToLine {
1023 project_id: project_id.to_proto(),
1024 buffer_id: buffer_id.into(),
1025 selection: Some(proto::Range {
1026 start: selection.start as u64,
1027 end: selection.end as u64,
1028 }),
1029 })
1030 .await?;
1031
1032 url::Url::parse(&response.permalink).context("failed to parse permalink")
1033 }
1034 }
1035 })
1036 });
1037 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1038 }
1039
1040 fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> {
1041 match &self.state {
1042 GitStoreState::Local {
1043 downstream: downstream_client,
1044 ..
1045 } => downstream_client
1046 .as_ref()
1047 .map(|state| (state.client.clone(), state.project_id)),
1048 GitStoreState::Ssh {
1049 downstream: downstream_client,
1050 ..
1051 } => downstream_client.clone(),
1052 GitStoreState::Remote { .. } => None,
1053 }
1054 }
1055
1056 fn upstream_client(&self) -> Option<AnyProtoClient> {
1057 match &self.state {
1058 GitStoreState::Local { .. } => None,
1059 GitStoreState::Ssh {
1060 upstream_client, ..
1061 }
1062 | GitStoreState::Remote {
1063 upstream_client, ..
1064 } => Some(upstream_client.clone()),
1065 }
1066 }
1067
1068 fn on_worktree_store_event(
1069 &mut self,
1070 worktree_store: Entity<WorktreeStore>,
1071 event: &WorktreeStoreEvent,
1072 cx: &mut Context<Self>,
1073 ) {
1074 let GitStoreState::Local {
1075 project_environment,
1076 downstream,
1077 next_repository_id,
1078 fs,
1079 } = &self.state
1080 else {
1081 return;
1082 };
1083
1084 match event {
1085 WorktreeStoreEvent::WorktreeUpdatedEntries(worktree_id, updated_entries) => {
1086 let mut paths_by_git_repo = HashMap::<_, Vec<_>>::default();
1087 for (relative_path, _, _) in updated_entries.iter() {
1088 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
1089 &(*worktree_id, relative_path.clone()).into(),
1090 cx,
1091 ) else {
1092 continue;
1093 };
1094 paths_by_git_repo.entry(repo).or_default().push(repo_path)
1095 }
1096
1097 for (repo, paths) in paths_by_git_repo {
1098 repo.update(cx, |repo, cx| {
1099 repo.paths_changed(
1100 paths,
1101 downstream
1102 .as_ref()
1103 .map(|downstream| downstream.updates_tx.clone()),
1104 cx,
1105 );
1106 });
1107 }
1108 }
1109 WorktreeStoreEvent::WorktreeUpdatedGitRepositories(worktree_id, changed_repos) => {
1110 let Some(worktree) = worktree_store.read(cx).worktree_for_id(*worktree_id, cx)
1111 else {
1112 return;
1113 };
1114 if !worktree.read(cx).is_visible() {
1115 log::debug!(
1116 "not adding repositories for local worktree {:?} because it's not visible",
1117 worktree.read(cx).abs_path()
1118 );
1119 return;
1120 }
1121 self.update_repositories_from_worktree(
1122 project_environment.clone(),
1123 next_repository_id.clone(),
1124 downstream
1125 .as_ref()
1126 .map(|downstream| downstream.updates_tx.clone()),
1127 changed_repos.clone(),
1128 fs.clone(),
1129 cx,
1130 );
1131 self.local_worktree_git_repos_changed(worktree, changed_repos, cx);
1132 }
1133 _ => {}
1134 }
1135 }
1136
1137 fn on_repository_event(
1138 &mut self,
1139 repo: Entity<Repository>,
1140 event: &RepositoryEvent,
1141 cx: &mut Context<Self>,
1142 ) {
1143 let id = repo.read(cx).id;
1144 let repo_snapshot = repo.read(cx).snapshot.clone();
1145 for (buffer_id, diff) in self.diffs.iter() {
1146 if let Some((buffer_repo, repo_path)) =
1147 self.repository_and_path_for_buffer_id(*buffer_id, cx)
1148 {
1149 if buffer_repo == repo {
1150 diff.update(cx, |diff, cx| {
1151 if let Some(conflict_set) = &diff.conflict_set {
1152 let conflict_status_changed =
1153 conflict_set.update(cx, |conflict_set, cx| {
1154 let has_conflict = repo_snapshot.has_conflict(&repo_path);
1155 conflict_set.set_has_conflict(has_conflict, cx)
1156 })?;
1157 if conflict_status_changed {
1158 let buffer_store = self.buffer_store.read(cx);
1159 if let Some(buffer) = buffer_store.get(*buffer_id) {
1160 let _ = diff.reparse_conflict_markers(
1161 buffer.read(cx).text_snapshot(),
1162 cx,
1163 );
1164 }
1165 }
1166 }
1167 anyhow::Ok(())
1168 })
1169 .ok();
1170 }
1171 }
1172 }
1173 cx.emit(GitStoreEvent::RepositoryUpdated(
1174 id,
1175 event.clone(),
1176 self.active_repo_id == Some(id),
1177 ))
1178 }
1179
1180 fn on_jobs_updated(&mut self, _: Entity<Repository>, _: &JobsUpdated, cx: &mut Context<Self>) {
1181 cx.emit(GitStoreEvent::JobsUpdated)
1182 }
1183
1184 /// Update our list of repositories and schedule git scans in response to a notification from a worktree,
1185 fn update_repositories_from_worktree(
1186 &mut self,
1187 project_environment: Entity<ProjectEnvironment>,
1188 next_repository_id: Arc<AtomicU64>,
1189 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
1190 updated_git_repositories: UpdatedGitRepositoriesSet,
1191 fs: Arc<dyn Fs>,
1192 cx: &mut Context<Self>,
1193 ) {
1194 let mut removed_ids = Vec::new();
1195 for update in updated_git_repositories.iter() {
1196 if let Some((id, existing)) = self.repositories.iter().find(|(_, repo)| {
1197 let existing_work_directory_abs_path =
1198 repo.read(cx).work_directory_abs_path.clone();
1199 Some(&existing_work_directory_abs_path)
1200 == update.old_work_directory_abs_path.as_ref()
1201 || Some(&existing_work_directory_abs_path)
1202 == update.new_work_directory_abs_path.as_ref()
1203 }) {
1204 if let Some(new_work_directory_abs_path) =
1205 update.new_work_directory_abs_path.clone()
1206 {
1207 existing.update(cx, |existing, cx| {
1208 existing.snapshot.work_directory_abs_path = new_work_directory_abs_path;
1209 existing.schedule_scan(updates_tx.clone(), cx);
1210 });
1211 } else {
1212 removed_ids.push(*id);
1213 }
1214 } else if let UpdatedGitRepository {
1215 new_work_directory_abs_path: Some(work_directory_abs_path),
1216 dot_git_abs_path: Some(dot_git_abs_path),
1217 repository_dir_abs_path: Some(repository_dir_abs_path),
1218 common_dir_abs_path: Some(common_dir_abs_path),
1219 ..
1220 } = update
1221 {
1222 let id = RepositoryId(next_repository_id.fetch_add(1, atomic::Ordering::Release));
1223 let git_store = cx.weak_entity();
1224 let repo = cx.new(|cx| {
1225 let mut repo = Repository::local(
1226 id,
1227 work_directory_abs_path.clone(),
1228 dot_git_abs_path.clone(),
1229 repository_dir_abs_path.clone(),
1230 common_dir_abs_path.clone(),
1231 project_environment.downgrade(),
1232 fs.clone(),
1233 git_store,
1234 cx,
1235 );
1236 repo.schedule_scan(updates_tx.clone(), cx);
1237 repo
1238 });
1239 self._subscriptions
1240 .push(cx.subscribe(&repo, Self::on_repository_event));
1241 self._subscriptions
1242 .push(cx.subscribe(&repo, Self::on_jobs_updated));
1243 self.repositories.insert(id, repo);
1244 cx.emit(GitStoreEvent::RepositoryAdded(id));
1245 self.active_repo_id.get_or_insert_with(|| {
1246 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1247 id
1248 });
1249 }
1250 }
1251
1252 for id in removed_ids {
1253 if self.active_repo_id == Some(id) {
1254 self.active_repo_id = None;
1255 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1256 }
1257 self.repositories.remove(&id);
1258 if let Some(updates_tx) = updates_tx.as_ref() {
1259 updates_tx
1260 .unbounded_send(DownstreamUpdate::RemoveRepository(id))
1261 .ok();
1262 }
1263 }
1264 }
1265
1266 fn on_buffer_store_event(
1267 &mut self,
1268 _: Entity<BufferStore>,
1269 event: &BufferStoreEvent,
1270 cx: &mut Context<Self>,
1271 ) {
1272 match event {
1273 BufferStoreEvent::BufferAdded(buffer) => {
1274 cx.subscribe(&buffer, |this, buffer, event, cx| {
1275 if let BufferEvent::LanguageChanged = event {
1276 let buffer_id = buffer.read(cx).remote_id();
1277 if let Some(diff_state) = this.diffs.get(&buffer_id) {
1278 diff_state.update(cx, |diff_state, cx| {
1279 diff_state.buffer_language_changed(buffer, cx);
1280 });
1281 }
1282 }
1283 })
1284 .detach();
1285 }
1286 BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => {
1287 if let Some(diffs) = self.shared_diffs.get_mut(peer_id) {
1288 diffs.remove(buffer_id);
1289 }
1290 }
1291 BufferStoreEvent::BufferDropped(buffer_id) => {
1292 self.diffs.remove(&buffer_id);
1293 for diffs in self.shared_diffs.values_mut() {
1294 diffs.remove(buffer_id);
1295 }
1296 }
1297
1298 _ => {}
1299 }
1300 }
1301
1302 pub fn recalculate_buffer_diffs(
1303 &mut self,
1304 buffers: Vec<Entity<Buffer>>,
1305 cx: &mut Context<Self>,
1306 ) -> impl Future<Output = ()> + use<> {
1307 let mut futures = Vec::new();
1308 for buffer in buffers {
1309 if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
1310 let buffer = buffer.read(cx).text_snapshot();
1311 diff_state.update(cx, |diff_state, cx| {
1312 diff_state.recalculate_diffs(buffer.clone(), cx);
1313 futures.extend(diff_state.wait_for_recalculation().map(FutureExt::boxed));
1314 });
1315 futures.push(diff_state.update(cx, |diff_state, cx| {
1316 diff_state
1317 .reparse_conflict_markers(buffer, cx)
1318 .map(|_| {})
1319 .boxed()
1320 }));
1321 }
1322 }
1323 async move {
1324 futures::future::join_all(futures).await;
1325 }
1326 }
1327
1328 fn on_buffer_diff_event(
1329 &mut self,
1330 diff: Entity<buffer_diff::BufferDiff>,
1331 event: &BufferDiffEvent,
1332 cx: &mut Context<Self>,
1333 ) {
1334 if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
1335 let buffer_id = diff.read(cx).buffer_id;
1336 if let Some(diff_state) = self.diffs.get(&buffer_id) {
1337 let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
1338 diff_state.hunk_staging_operation_count += 1;
1339 diff_state.hunk_staging_operation_count
1340 });
1341 if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
1342 let recv = repo.update(cx, |repo, cx| {
1343 log::debug!("hunks changed for {}", path.display());
1344 repo.spawn_set_index_text_job(
1345 path,
1346 new_index_text.as_ref().map(|rope| rope.to_string()),
1347 Some(hunk_staging_operation_count),
1348 cx,
1349 )
1350 });
1351 let diff = diff.downgrade();
1352 cx.spawn(async move |this, cx| {
1353 if let Ok(Err(error)) = cx.background_spawn(recv).await {
1354 diff.update(cx, |diff, cx| {
1355 diff.clear_pending_hunks(cx);
1356 })
1357 .ok();
1358 this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
1359 .ok();
1360 }
1361 })
1362 .detach();
1363 }
1364 }
1365 }
1366 }
1367
1368 fn local_worktree_git_repos_changed(
1369 &mut self,
1370 worktree: Entity<Worktree>,
1371 changed_repos: &UpdatedGitRepositoriesSet,
1372 cx: &mut Context<Self>,
1373 ) {
1374 log::debug!("local worktree repos changed");
1375 debug_assert!(worktree.read(cx).is_local());
1376
1377 for repository in self.repositories.values() {
1378 repository.update(cx, |repository, cx| {
1379 let repo_abs_path = &repository.work_directory_abs_path;
1380 if changed_repos.iter().any(|update| {
1381 update.old_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1382 || update.new_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1383 }) {
1384 repository.reload_buffer_diff_bases(cx);
1385 }
1386 });
1387 }
1388 }
1389
1390 pub fn repositories(&self) -> &HashMap<RepositoryId, Entity<Repository>> {
1391 &self.repositories
1392 }
1393
1394 pub fn status_for_buffer_id(&self, buffer_id: BufferId, cx: &App) -> Option<FileStatus> {
1395 let (repo, path) = self.repository_and_path_for_buffer_id(buffer_id, cx)?;
1396 let status = repo.read(cx).snapshot.status_for_path(&path)?;
1397 Some(status.status)
1398 }
1399
1400 pub fn repository_and_path_for_buffer_id(
1401 &self,
1402 buffer_id: BufferId,
1403 cx: &App,
1404 ) -> Option<(Entity<Repository>, RepoPath)> {
1405 let buffer = self.buffer_store.read(cx).get(buffer_id)?;
1406 let project_path = buffer.read(cx).project_path(cx)?;
1407 self.repository_and_path_for_project_path(&project_path, cx)
1408 }
1409
1410 pub fn repository_and_path_for_project_path(
1411 &self,
1412 path: &ProjectPath,
1413 cx: &App,
1414 ) -> Option<(Entity<Repository>, RepoPath)> {
1415 let abs_path = self.worktree_store.read(cx).absolutize(path, cx)?;
1416 self.repositories
1417 .values()
1418 .filter_map(|repo| {
1419 let repo_path = repo.read(cx).abs_path_to_repo_path(&abs_path)?;
1420 Some((repo.clone(), repo_path))
1421 })
1422 .max_by_key(|(repo, _)| repo.read(cx).work_directory_abs_path.clone())
1423 }
1424
1425 pub fn git_init(
1426 &self,
1427 path: Arc<Path>,
1428 fallback_branch_name: String,
1429 cx: &App,
1430 ) -> Task<Result<()>> {
1431 match &self.state {
1432 GitStoreState::Local { fs, .. } => {
1433 let fs = fs.clone();
1434 cx.background_executor()
1435 .spawn(async move { fs.git_init(&path, fallback_branch_name) })
1436 }
1437 GitStoreState::Ssh {
1438 upstream_client,
1439 upstream_project_id: project_id,
1440 ..
1441 }
1442 | GitStoreState::Remote {
1443 upstream_client,
1444 upstream_project_id: project_id,
1445 ..
1446 } => {
1447 let client = upstream_client.clone();
1448 let project_id = *project_id;
1449 cx.background_executor().spawn(async move {
1450 client
1451 .request(proto::GitInit {
1452 project_id: project_id.0,
1453 abs_path: path.to_string_lossy().to_string(),
1454 fallback_branch_name,
1455 })
1456 .await?;
1457 Ok(())
1458 })
1459 }
1460 }
1461 }
1462
1463 async fn handle_update_repository(
1464 this: Entity<Self>,
1465 envelope: TypedEnvelope<proto::UpdateRepository>,
1466 mut cx: AsyncApp,
1467 ) -> Result<()> {
1468 this.update(&mut cx, |this, cx| {
1469 let mut update = envelope.payload;
1470
1471 let id = RepositoryId::from_proto(update.id);
1472 let client = this
1473 .upstream_client()
1474 .context("no upstream client")?
1475 .clone();
1476
1477 let mut is_new = false;
1478 let repo = this.repositories.entry(id).or_insert_with(|| {
1479 is_new = true;
1480 let git_store = cx.weak_entity();
1481 cx.new(|cx| {
1482 Repository::remote(
1483 id,
1484 Path::new(&update.abs_path).into(),
1485 ProjectId(update.project_id),
1486 client,
1487 git_store,
1488 cx,
1489 )
1490 })
1491 });
1492 if is_new {
1493 this._subscriptions
1494 .push(cx.subscribe(&repo, Self::on_repository_event))
1495 }
1496
1497 repo.update(cx, {
1498 let update = update.clone();
1499 |repo, cx| repo.apply_remote_update(update, cx)
1500 })?;
1501
1502 this.active_repo_id.get_or_insert_with(|| {
1503 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1504 id
1505 });
1506
1507 if let Some((client, project_id)) = this.downstream_client() {
1508 update.project_id = project_id.to_proto();
1509 client.send(update).log_err();
1510 }
1511 Ok(())
1512 })?
1513 }
1514
1515 async fn handle_remove_repository(
1516 this: Entity<Self>,
1517 envelope: TypedEnvelope<proto::RemoveRepository>,
1518 mut cx: AsyncApp,
1519 ) -> Result<()> {
1520 this.update(&mut cx, |this, cx| {
1521 let mut update = envelope.payload;
1522 let id = RepositoryId::from_proto(update.id);
1523 this.repositories.remove(&id);
1524 if let Some((client, project_id)) = this.downstream_client() {
1525 update.project_id = project_id.to_proto();
1526 client.send(update).log_err();
1527 }
1528 if this.active_repo_id == Some(id) {
1529 this.active_repo_id = None;
1530 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1531 }
1532 cx.emit(GitStoreEvent::RepositoryRemoved(id));
1533 })
1534 }
1535
1536 async fn handle_git_init(
1537 this: Entity<Self>,
1538 envelope: TypedEnvelope<proto::GitInit>,
1539 cx: AsyncApp,
1540 ) -> Result<proto::Ack> {
1541 let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1542 let name = envelope.payload.fallback_branch_name;
1543 cx.update(|cx| this.read(cx).git_init(path, name, cx))?
1544 .await?;
1545
1546 Ok(proto::Ack {})
1547 }
1548
1549 async fn handle_fetch(
1550 this: Entity<Self>,
1551 envelope: TypedEnvelope<proto::Fetch>,
1552 mut cx: AsyncApp,
1553 ) -> Result<proto::RemoteMessageResponse> {
1554 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1555 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1556 let askpass_id = envelope.payload.askpass_id;
1557
1558 let askpass = make_remote_delegate(
1559 this,
1560 envelope.payload.project_id,
1561 repository_id,
1562 askpass_id,
1563 &mut cx,
1564 );
1565
1566 let remote_output = repository_handle
1567 .update(&mut cx, |repository_handle, cx| {
1568 repository_handle.fetch(askpass, cx)
1569 })?
1570 .await??;
1571
1572 Ok(proto::RemoteMessageResponse {
1573 stdout: remote_output.stdout,
1574 stderr: remote_output.stderr,
1575 })
1576 }
1577
1578 async fn handle_push(
1579 this: Entity<Self>,
1580 envelope: TypedEnvelope<proto::Push>,
1581 mut cx: AsyncApp,
1582 ) -> Result<proto::RemoteMessageResponse> {
1583 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1584 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1585
1586 let askpass_id = envelope.payload.askpass_id;
1587 let askpass = make_remote_delegate(
1588 this,
1589 envelope.payload.project_id,
1590 repository_id,
1591 askpass_id,
1592 &mut cx,
1593 );
1594
1595 let options = envelope
1596 .payload
1597 .options
1598 .as_ref()
1599 .map(|_| match envelope.payload.options() {
1600 proto::push::PushOptions::SetUpstream => git::repository::PushOptions::SetUpstream,
1601 proto::push::PushOptions::Force => git::repository::PushOptions::Force,
1602 });
1603
1604 let branch_name = envelope.payload.branch_name.into();
1605 let remote_name = envelope.payload.remote_name.into();
1606
1607 let remote_output = repository_handle
1608 .update(&mut cx, |repository_handle, cx| {
1609 repository_handle.push(branch_name, remote_name, options, askpass, cx)
1610 })?
1611 .await??;
1612 Ok(proto::RemoteMessageResponse {
1613 stdout: remote_output.stdout,
1614 stderr: remote_output.stderr,
1615 })
1616 }
1617
1618 async fn handle_pull(
1619 this: Entity<Self>,
1620 envelope: TypedEnvelope<proto::Pull>,
1621 mut cx: AsyncApp,
1622 ) -> Result<proto::RemoteMessageResponse> {
1623 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1624 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1625 let askpass_id = envelope.payload.askpass_id;
1626 let askpass = make_remote_delegate(
1627 this,
1628 envelope.payload.project_id,
1629 repository_id,
1630 askpass_id,
1631 &mut cx,
1632 );
1633
1634 let branch_name = envelope.payload.branch_name.into();
1635 let remote_name = envelope.payload.remote_name.into();
1636
1637 let remote_message = repository_handle
1638 .update(&mut cx, |repository_handle, cx| {
1639 repository_handle.pull(branch_name, remote_name, askpass, cx)
1640 })?
1641 .await??;
1642
1643 Ok(proto::RemoteMessageResponse {
1644 stdout: remote_message.stdout,
1645 stderr: remote_message.stderr,
1646 })
1647 }
1648
1649 async fn handle_stage(
1650 this: Entity<Self>,
1651 envelope: TypedEnvelope<proto::Stage>,
1652 mut cx: AsyncApp,
1653 ) -> Result<proto::Ack> {
1654 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1655 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1656
1657 let entries = envelope
1658 .payload
1659 .paths
1660 .into_iter()
1661 .map(PathBuf::from)
1662 .map(RepoPath::new)
1663 .collect();
1664
1665 repository_handle
1666 .update(&mut cx, |repository_handle, cx| {
1667 repository_handle.stage_entries(entries, cx)
1668 })?
1669 .await?;
1670 Ok(proto::Ack {})
1671 }
1672
1673 async fn handle_unstage(
1674 this: Entity<Self>,
1675 envelope: TypedEnvelope<proto::Unstage>,
1676 mut cx: AsyncApp,
1677 ) -> Result<proto::Ack> {
1678 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1679 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1680
1681 let entries = envelope
1682 .payload
1683 .paths
1684 .into_iter()
1685 .map(PathBuf::from)
1686 .map(RepoPath::new)
1687 .collect();
1688
1689 repository_handle
1690 .update(&mut cx, |repository_handle, cx| {
1691 repository_handle.unstage_entries(entries, cx)
1692 })?
1693 .await?;
1694
1695 Ok(proto::Ack {})
1696 }
1697
1698 async fn handle_set_index_text(
1699 this: Entity<Self>,
1700 envelope: TypedEnvelope<proto::SetIndexText>,
1701 mut cx: AsyncApp,
1702 ) -> Result<proto::Ack> {
1703 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1704 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1705 let repo_path = RepoPath::from_str(&envelope.payload.path);
1706
1707 repository_handle
1708 .update(&mut cx, |repository_handle, cx| {
1709 repository_handle.spawn_set_index_text_job(
1710 repo_path,
1711 envelope.payload.text,
1712 None,
1713 cx,
1714 )
1715 })?
1716 .await??;
1717 Ok(proto::Ack {})
1718 }
1719
1720 async fn handle_commit(
1721 this: Entity<Self>,
1722 envelope: TypedEnvelope<proto::Commit>,
1723 mut cx: AsyncApp,
1724 ) -> Result<proto::Ack> {
1725 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1726 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1727
1728 let message = SharedString::from(envelope.payload.message);
1729 let name = envelope.payload.name.map(SharedString::from);
1730 let email = envelope.payload.email.map(SharedString::from);
1731 let options = envelope.payload.options.unwrap_or_default();
1732
1733 repository_handle
1734 .update(&mut cx, |repository_handle, cx| {
1735 repository_handle.commit(
1736 message,
1737 name.zip(email),
1738 CommitOptions {
1739 amend: options.amend,
1740 },
1741 cx,
1742 )
1743 })?
1744 .await??;
1745 Ok(proto::Ack {})
1746 }
1747
1748 async fn handle_get_remotes(
1749 this: Entity<Self>,
1750 envelope: TypedEnvelope<proto::GetRemotes>,
1751 mut cx: AsyncApp,
1752 ) -> Result<proto::GetRemotesResponse> {
1753 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1754 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1755
1756 let branch_name = envelope.payload.branch_name;
1757
1758 let remotes = repository_handle
1759 .update(&mut cx, |repository_handle, _| {
1760 repository_handle.get_remotes(branch_name)
1761 })?
1762 .await??;
1763
1764 Ok(proto::GetRemotesResponse {
1765 remotes: remotes
1766 .into_iter()
1767 .map(|remotes| proto::get_remotes_response::Remote {
1768 name: remotes.name.to_string(),
1769 })
1770 .collect::<Vec<_>>(),
1771 })
1772 }
1773
1774 async fn handle_get_branches(
1775 this: Entity<Self>,
1776 envelope: TypedEnvelope<proto::GitGetBranches>,
1777 mut cx: AsyncApp,
1778 ) -> Result<proto::GitBranchesResponse> {
1779 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1780 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1781
1782 let branches = repository_handle
1783 .update(&mut cx, |repository_handle, _| repository_handle.branches())?
1784 .await??;
1785
1786 Ok(proto::GitBranchesResponse {
1787 branches: branches
1788 .into_iter()
1789 .map(|branch| branch_to_proto(&branch))
1790 .collect::<Vec<_>>(),
1791 })
1792 }
1793 async fn handle_create_branch(
1794 this: Entity<Self>,
1795 envelope: TypedEnvelope<proto::GitCreateBranch>,
1796 mut cx: AsyncApp,
1797 ) -> Result<proto::Ack> {
1798 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1799 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1800 let branch_name = envelope.payload.branch_name;
1801
1802 repository_handle
1803 .update(&mut cx, |repository_handle, _| {
1804 repository_handle.create_branch(branch_name)
1805 })?
1806 .await??;
1807
1808 Ok(proto::Ack {})
1809 }
1810
1811 async fn handle_change_branch(
1812 this: Entity<Self>,
1813 envelope: TypedEnvelope<proto::GitChangeBranch>,
1814 mut cx: AsyncApp,
1815 ) -> Result<proto::Ack> {
1816 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1817 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1818 let branch_name = envelope.payload.branch_name;
1819
1820 repository_handle
1821 .update(&mut cx, |repository_handle, _| {
1822 repository_handle.change_branch(branch_name)
1823 })?
1824 .await??;
1825
1826 Ok(proto::Ack {})
1827 }
1828
1829 async fn handle_show(
1830 this: Entity<Self>,
1831 envelope: TypedEnvelope<proto::GitShow>,
1832 mut cx: AsyncApp,
1833 ) -> Result<proto::GitCommitDetails> {
1834 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1835 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1836
1837 let commit = repository_handle
1838 .update(&mut cx, |repository_handle, _| {
1839 repository_handle.show(envelope.payload.commit)
1840 })?
1841 .await??;
1842 Ok(proto::GitCommitDetails {
1843 sha: commit.sha.into(),
1844 message: commit.message.into(),
1845 commit_timestamp: commit.commit_timestamp,
1846 author_email: commit.author_email.into(),
1847 author_name: commit.author_name.into(),
1848 })
1849 }
1850
1851 async fn handle_load_commit_diff(
1852 this: Entity<Self>,
1853 envelope: TypedEnvelope<proto::LoadCommitDiff>,
1854 mut cx: AsyncApp,
1855 ) -> Result<proto::LoadCommitDiffResponse> {
1856 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1857 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1858
1859 let commit_diff = repository_handle
1860 .update(&mut cx, |repository_handle, _| {
1861 repository_handle.load_commit_diff(envelope.payload.commit)
1862 })?
1863 .await??;
1864 Ok(proto::LoadCommitDiffResponse {
1865 files: commit_diff
1866 .files
1867 .into_iter()
1868 .map(|file| proto::CommitFile {
1869 path: file.path.to_string(),
1870 old_text: file.old_text,
1871 new_text: file.new_text,
1872 })
1873 .collect(),
1874 })
1875 }
1876
1877 async fn handle_reset(
1878 this: Entity<Self>,
1879 envelope: TypedEnvelope<proto::GitReset>,
1880 mut cx: AsyncApp,
1881 ) -> Result<proto::Ack> {
1882 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1883 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1884
1885 let mode = match envelope.payload.mode() {
1886 git_reset::ResetMode::Soft => ResetMode::Soft,
1887 git_reset::ResetMode::Mixed => ResetMode::Mixed,
1888 };
1889
1890 repository_handle
1891 .update(&mut cx, |repository_handle, cx| {
1892 repository_handle.reset(envelope.payload.commit, mode, cx)
1893 })?
1894 .await??;
1895 Ok(proto::Ack {})
1896 }
1897
1898 async fn handle_checkout_files(
1899 this: Entity<Self>,
1900 envelope: TypedEnvelope<proto::GitCheckoutFiles>,
1901 mut cx: AsyncApp,
1902 ) -> Result<proto::Ack> {
1903 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1904 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1905 let paths = envelope
1906 .payload
1907 .paths
1908 .iter()
1909 .map(|s| RepoPath::from_str(s))
1910 .collect();
1911
1912 repository_handle
1913 .update(&mut cx, |repository_handle, cx| {
1914 repository_handle.checkout_files(&envelope.payload.commit, paths, cx)
1915 })?
1916 .await??;
1917 Ok(proto::Ack {})
1918 }
1919
1920 async fn handle_open_commit_message_buffer(
1921 this: Entity<Self>,
1922 envelope: TypedEnvelope<proto::OpenCommitMessageBuffer>,
1923 mut cx: AsyncApp,
1924 ) -> Result<proto::OpenBufferResponse> {
1925 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1926 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1927 let buffer = repository
1928 .update(&mut cx, |repository, cx| {
1929 repository.open_commit_buffer(None, this.read(cx).buffer_store.clone(), cx)
1930 })?
1931 .await?;
1932
1933 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id())?;
1934 this.update(&mut cx, |this, cx| {
1935 this.buffer_store.update(cx, |buffer_store, cx| {
1936 buffer_store
1937 .create_buffer_for_peer(
1938 &buffer,
1939 envelope.original_sender_id.unwrap_or(envelope.sender_id),
1940 cx,
1941 )
1942 .detach_and_log_err(cx);
1943 })
1944 })?;
1945
1946 Ok(proto::OpenBufferResponse {
1947 buffer_id: buffer_id.to_proto(),
1948 })
1949 }
1950
1951 async fn handle_askpass(
1952 this: Entity<Self>,
1953 envelope: TypedEnvelope<proto::AskPassRequest>,
1954 mut cx: AsyncApp,
1955 ) -> Result<proto::AskPassResponse> {
1956 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1957 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1958
1959 let delegates = cx.update(|cx| repository.read(cx).askpass_delegates.clone())?;
1960 let Some(mut askpass) = delegates.lock().remove(&envelope.payload.askpass_id) else {
1961 debug_panic!("no askpass found");
1962 anyhow::bail!("no askpass found");
1963 };
1964
1965 let response = askpass.ask_password(envelope.payload.prompt).await?;
1966
1967 delegates
1968 .lock()
1969 .insert(envelope.payload.askpass_id, askpass);
1970
1971 Ok(proto::AskPassResponse { response })
1972 }
1973
1974 async fn handle_check_for_pushed_commits(
1975 this: Entity<Self>,
1976 envelope: TypedEnvelope<proto::CheckForPushedCommits>,
1977 mut cx: AsyncApp,
1978 ) -> Result<proto::CheckForPushedCommitsResponse> {
1979 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1980 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1981
1982 let branches = repository_handle
1983 .update(&mut cx, |repository_handle, _| {
1984 repository_handle.check_for_pushed_commits()
1985 })?
1986 .await??;
1987 Ok(proto::CheckForPushedCommitsResponse {
1988 pushed_to: branches
1989 .into_iter()
1990 .map(|commit| commit.to_string())
1991 .collect(),
1992 })
1993 }
1994
1995 async fn handle_git_diff(
1996 this: Entity<Self>,
1997 envelope: TypedEnvelope<proto::GitDiff>,
1998 mut cx: AsyncApp,
1999 ) -> Result<proto::GitDiffResponse> {
2000 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2001 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2002 let diff_type = match envelope.payload.diff_type() {
2003 proto::git_diff::DiffType::HeadToIndex => DiffType::HeadToIndex,
2004 proto::git_diff::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
2005 };
2006
2007 let mut diff = repository_handle
2008 .update(&mut cx, |repository_handle, cx| {
2009 repository_handle.diff(diff_type, cx)
2010 })?
2011 .await??;
2012 const ONE_MB: usize = 1_000_000;
2013 if diff.len() > ONE_MB {
2014 diff = diff.chars().take(ONE_MB).collect()
2015 }
2016
2017 Ok(proto::GitDiffResponse { diff })
2018 }
2019
2020 async fn handle_open_unstaged_diff(
2021 this: Entity<Self>,
2022 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2023 mut cx: AsyncApp,
2024 ) -> Result<proto::OpenUnstagedDiffResponse> {
2025 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2026 let diff = this
2027 .update(&mut cx, |this, cx| {
2028 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2029 Some(this.open_unstaged_diff(buffer, cx))
2030 })?
2031 .context("missing buffer")?
2032 .await?;
2033 this.update(&mut cx, |this, _| {
2034 let shared_diffs = this
2035 .shared_diffs
2036 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2037 .or_default();
2038 shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone());
2039 })?;
2040 let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?;
2041 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2042 }
2043
2044 async fn handle_open_uncommitted_diff(
2045 this: Entity<Self>,
2046 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2047 mut cx: AsyncApp,
2048 ) -> Result<proto::OpenUncommittedDiffResponse> {
2049 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2050 let diff = this
2051 .update(&mut cx, |this, cx| {
2052 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2053 Some(this.open_uncommitted_diff(buffer, cx))
2054 })?
2055 .context("missing buffer")?
2056 .await?;
2057 this.update(&mut cx, |this, _| {
2058 let shared_diffs = this
2059 .shared_diffs
2060 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2061 .or_default();
2062 shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone());
2063 })?;
2064 diff.read_with(&cx, |diff, cx| {
2065 use proto::open_uncommitted_diff_response::Mode;
2066
2067 let unstaged_diff = diff.secondary_diff();
2068 let index_snapshot = unstaged_diff.and_then(|diff| {
2069 let diff = diff.read(cx);
2070 diff.base_text_exists().then(|| diff.base_text())
2071 });
2072
2073 let mode;
2074 let staged_text;
2075 let committed_text;
2076 if diff.base_text_exists() {
2077 let committed_snapshot = diff.base_text();
2078 committed_text = Some(committed_snapshot.text());
2079 if let Some(index_text) = index_snapshot {
2080 if index_text.remote_id() == committed_snapshot.remote_id() {
2081 mode = Mode::IndexMatchesHead;
2082 staged_text = None;
2083 } else {
2084 mode = Mode::IndexAndHead;
2085 staged_text = Some(index_text.text());
2086 }
2087 } else {
2088 mode = Mode::IndexAndHead;
2089 staged_text = None;
2090 }
2091 } else {
2092 mode = Mode::IndexAndHead;
2093 committed_text = None;
2094 staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2095 }
2096
2097 proto::OpenUncommittedDiffResponse {
2098 committed_text,
2099 staged_text,
2100 mode: mode.into(),
2101 }
2102 })
2103 }
2104
2105 async fn handle_update_diff_bases(
2106 this: Entity<Self>,
2107 request: TypedEnvelope<proto::UpdateDiffBases>,
2108 mut cx: AsyncApp,
2109 ) -> Result<()> {
2110 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2111 this.update(&mut cx, |this, cx| {
2112 if let Some(diff_state) = this.diffs.get_mut(&buffer_id) {
2113 if let Some(buffer) = this.buffer_store.read(cx).get(buffer_id) {
2114 let buffer = buffer.read(cx).text_snapshot();
2115 diff_state.update(cx, |diff_state, cx| {
2116 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2117 })
2118 }
2119 }
2120 })
2121 }
2122
2123 async fn handle_blame_buffer(
2124 this: Entity<Self>,
2125 envelope: TypedEnvelope<proto::BlameBuffer>,
2126 mut cx: AsyncApp,
2127 ) -> Result<proto::BlameBufferResponse> {
2128 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2129 let version = deserialize_version(&envelope.payload.version);
2130 let buffer = this.read_with(&cx, |this, cx| {
2131 this.buffer_store.read(cx).get_existing(buffer_id)
2132 })??;
2133 buffer
2134 .update(&mut cx, |buffer, _| {
2135 buffer.wait_for_version(version.clone())
2136 })?
2137 .await?;
2138 let blame = this
2139 .update(&mut cx, |this, cx| {
2140 this.blame_buffer(&buffer, Some(version), cx)
2141 })?
2142 .await?;
2143 Ok(serialize_blame_buffer_response(blame))
2144 }
2145
2146 async fn handle_get_permalink_to_line(
2147 this: Entity<Self>,
2148 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2149 mut cx: AsyncApp,
2150 ) -> Result<proto::GetPermalinkToLineResponse> {
2151 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2152 // let version = deserialize_version(&envelope.payload.version);
2153 let selection = {
2154 let proto_selection = envelope
2155 .payload
2156 .selection
2157 .context("no selection to get permalink for defined")?;
2158 proto_selection.start as u32..proto_selection.end as u32
2159 };
2160 let buffer = this.read_with(&cx, |this, cx| {
2161 this.buffer_store.read(cx).get_existing(buffer_id)
2162 })??;
2163 let permalink = this
2164 .update(&mut cx, |this, cx| {
2165 this.get_permalink_to_line(&buffer, selection, cx)
2166 })?
2167 .await?;
2168 Ok(proto::GetPermalinkToLineResponse {
2169 permalink: permalink.to_string(),
2170 })
2171 }
2172
2173 fn repository_for_request(
2174 this: &Entity<Self>,
2175 id: RepositoryId,
2176 cx: &mut AsyncApp,
2177 ) -> Result<Entity<Repository>> {
2178 this.read_with(cx, |this, _| {
2179 this.repositories
2180 .get(&id)
2181 .context("missing repository handle")
2182 .cloned()
2183 })?
2184 }
2185
2186 pub fn repo_snapshots(&self, cx: &App) -> HashMap<RepositoryId, RepositorySnapshot> {
2187 self.repositories
2188 .iter()
2189 .map(|(id, repo)| (*id, repo.read(cx).snapshot.clone()))
2190 .collect()
2191 }
2192}
2193
2194impl BufferGitState {
2195 fn new(_git_store: WeakEntity<GitStore>) -> Self {
2196 Self {
2197 unstaged_diff: Default::default(),
2198 uncommitted_diff: Default::default(),
2199 recalculate_diff_task: Default::default(),
2200 language: Default::default(),
2201 language_registry: Default::default(),
2202 recalculating_tx: postage::watch::channel_with(false).0,
2203 hunk_staging_operation_count: 0,
2204 hunk_staging_operation_count_as_of_write: 0,
2205 head_text: Default::default(),
2206 index_text: Default::default(),
2207 head_changed: Default::default(),
2208 index_changed: Default::default(),
2209 language_changed: Default::default(),
2210 conflict_updated_futures: Default::default(),
2211 conflict_set: Default::default(),
2212 reparse_conflict_markers_task: Default::default(),
2213 }
2214 }
2215
2216 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
2217 self.language = buffer.read(cx).language().cloned();
2218 self.language_changed = true;
2219 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
2220 }
2221
2222 fn reparse_conflict_markers(
2223 &mut self,
2224 buffer: text::BufferSnapshot,
2225 cx: &mut Context<Self>,
2226 ) -> oneshot::Receiver<()> {
2227 let (tx, rx) = oneshot::channel();
2228
2229 let Some(conflict_set) = self
2230 .conflict_set
2231 .as_ref()
2232 .and_then(|conflict_set| conflict_set.upgrade())
2233 else {
2234 return rx;
2235 };
2236
2237 let old_snapshot = conflict_set.read_with(cx, |conflict_set, _| {
2238 if conflict_set.has_conflict {
2239 Some(conflict_set.snapshot())
2240 } else {
2241 None
2242 }
2243 });
2244
2245 if let Some(old_snapshot) = old_snapshot {
2246 self.conflict_updated_futures.push(tx);
2247 self.reparse_conflict_markers_task = Some(cx.spawn(async move |this, cx| {
2248 let (snapshot, changed_range) = cx
2249 .background_spawn(async move {
2250 let new_snapshot = ConflictSet::parse(&buffer);
2251 let changed_range = old_snapshot.compare(&new_snapshot, &buffer);
2252 (new_snapshot, changed_range)
2253 })
2254 .await;
2255 this.update(cx, |this, cx| {
2256 if let Some(conflict_set) = &this.conflict_set {
2257 conflict_set
2258 .update(cx, |conflict_set, cx| {
2259 conflict_set.set_snapshot(snapshot, changed_range, cx);
2260 })
2261 .ok();
2262 }
2263 let futures = std::mem::take(&mut this.conflict_updated_futures);
2264 for tx in futures {
2265 tx.send(()).ok();
2266 }
2267 })
2268 }))
2269 }
2270
2271 rx
2272 }
2273
2274 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
2275 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
2276 }
2277
2278 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
2279 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
2280 }
2281
2282 fn handle_base_texts_updated(
2283 &mut self,
2284 buffer: text::BufferSnapshot,
2285 message: proto::UpdateDiffBases,
2286 cx: &mut Context<Self>,
2287 ) {
2288 use proto::update_diff_bases::Mode;
2289
2290 let Some(mode) = Mode::from_i32(message.mode) else {
2291 return;
2292 };
2293
2294 let diff_bases_change = match mode {
2295 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
2296 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
2297 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
2298 Mode::IndexAndHead => DiffBasesChange::SetEach {
2299 index: message.staged_text,
2300 head: message.committed_text,
2301 },
2302 };
2303
2304 self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
2305 }
2306
2307 pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
2308 if *self.recalculating_tx.borrow() {
2309 let mut rx = self.recalculating_tx.subscribe();
2310 return Some(async move {
2311 loop {
2312 let is_recalculating = rx.recv().await;
2313 if is_recalculating != Some(true) {
2314 break;
2315 }
2316 }
2317 });
2318 } else {
2319 None
2320 }
2321 }
2322
2323 fn diff_bases_changed(
2324 &mut self,
2325 buffer: text::BufferSnapshot,
2326 diff_bases_change: Option<DiffBasesChange>,
2327 cx: &mut Context<Self>,
2328 ) {
2329 match diff_bases_change {
2330 Some(DiffBasesChange::SetIndex(index)) => {
2331 self.index_text = index.map(|mut index| {
2332 text::LineEnding::normalize(&mut index);
2333 Arc::new(index)
2334 });
2335 self.index_changed = true;
2336 }
2337 Some(DiffBasesChange::SetHead(head)) => {
2338 self.head_text = head.map(|mut head| {
2339 text::LineEnding::normalize(&mut head);
2340 Arc::new(head)
2341 });
2342 self.head_changed = true;
2343 }
2344 Some(DiffBasesChange::SetBoth(text)) => {
2345 let text = text.map(|mut text| {
2346 text::LineEnding::normalize(&mut text);
2347 Arc::new(text)
2348 });
2349 self.head_text = text.clone();
2350 self.index_text = text;
2351 self.head_changed = true;
2352 self.index_changed = true;
2353 }
2354 Some(DiffBasesChange::SetEach { index, head }) => {
2355 self.index_text = index.map(|mut index| {
2356 text::LineEnding::normalize(&mut index);
2357 Arc::new(index)
2358 });
2359 self.index_changed = true;
2360 self.head_text = head.map(|mut head| {
2361 text::LineEnding::normalize(&mut head);
2362 Arc::new(head)
2363 });
2364 self.head_changed = true;
2365 }
2366 None => {}
2367 }
2368
2369 self.recalculate_diffs(buffer, cx)
2370 }
2371
2372 fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
2373 *self.recalculating_tx.borrow_mut() = true;
2374
2375 let language = self.language.clone();
2376 let language_registry = self.language_registry.clone();
2377 let unstaged_diff = self.unstaged_diff();
2378 let uncommitted_diff = self.uncommitted_diff();
2379 let head = self.head_text.clone();
2380 let index = self.index_text.clone();
2381 let index_changed = self.index_changed;
2382 let head_changed = self.head_changed;
2383 let language_changed = self.language_changed;
2384 let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_write;
2385 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
2386 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
2387 (None, None) => true,
2388 _ => false,
2389 };
2390 self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
2391 log::debug!(
2392 "start recalculating diffs for buffer {}",
2393 buffer.remote_id()
2394 );
2395
2396 let mut new_unstaged_diff = None;
2397 if let Some(unstaged_diff) = &unstaged_diff {
2398 new_unstaged_diff = Some(
2399 BufferDiff::update_diff(
2400 unstaged_diff.clone(),
2401 buffer.clone(),
2402 index,
2403 index_changed,
2404 language_changed,
2405 language.clone(),
2406 language_registry.clone(),
2407 cx,
2408 )
2409 .await?,
2410 );
2411 }
2412
2413 let mut new_uncommitted_diff = None;
2414 if let Some(uncommitted_diff) = &uncommitted_diff {
2415 new_uncommitted_diff = if index_matches_head {
2416 new_unstaged_diff.clone()
2417 } else {
2418 Some(
2419 BufferDiff::update_diff(
2420 uncommitted_diff.clone(),
2421 buffer.clone(),
2422 head,
2423 head_changed,
2424 language_changed,
2425 language.clone(),
2426 language_registry.clone(),
2427 cx,
2428 )
2429 .await?,
2430 )
2431 }
2432 }
2433
2434 let cancel = this.update(cx, |this, _| {
2435 // This checks whether all pending stage/unstage operations
2436 // have quiesced (i.e. both the corresponding write and the
2437 // read of that write have completed). If not, then we cancel
2438 // this recalculation attempt to avoid invalidating pending
2439 // state too quickly; another recalculation will come along
2440 // later and clear the pending state once the state of the index has settled.
2441 if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
2442 *this.recalculating_tx.borrow_mut() = false;
2443 true
2444 } else {
2445 false
2446 }
2447 })?;
2448 if cancel {
2449 log::debug!(
2450 concat!(
2451 "aborting recalculating diffs for buffer {}",
2452 "due to subsequent hunk operations",
2453 ),
2454 buffer.remote_id()
2455 );
2456 return Ok(());
2457 }
2458
2459 let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
2460 unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
2461 {
2462 unstaged_diff.update(cx, |diff, cx| {
2463 if language_changed {
2464 diff.language_changed(cx);
2465 }
2466 diff.set_snapshot(new_unstaged_diff, &buffer, cx)
2467 })?
2468 } else {
2469 None
2470 };
2471
2472 if let Some((uncommitted_diff, new_uncommitted_diff)) =
2473 uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
2474 {
2475 uncommitted_diff.update(cx, |diff, cx| {
2476 if language_changed {
2477 diff.language_changed(cx);
2478 }
2479 diff.set_snapshot_with_secondary(
2480 new_uncommitted_diff,
2481 &buffer,
2482 unstaged_changed_range,
2483 true,
2484 cx,
2485 );
2486 })?;
2487 }
2488
2489 log::debug!(
2490 "finished recalculating diffs for buffer {}",
2491 buffer.remote_id()
2492 );
2493
2494 if let Some(this) = this.upgrade() {
2495 this.update(cx, |this, _| {
2496 this.index_changed = false;
2497 this.head_changed = false;
2498 this.language_changed = false;
2499 *this.recalculating_tx.borrow_mut() = false;
2500 })?;
2501 }
2502
2503 Ok(())
2504 }));
2505 }
2506}
2507
2508fn make_remote_delegate(
2509 this: Entity<GitStore>,
2510 project_id: u64,
2511 repository_id: RepositoryId,
2512 askpass_id: u64,
2513 cx: &mut AsyncApp,
2514) -> AskPassDelegate {
2515 AskPassDelegate::new(cx, move |prompt, tx, cx| {
2516 this.update(cx, |this, cx| {
2517 let Some((client, _)) = this.downstream_client() else {
2518 return;
2519 };
2520 let response = client.request(proto::AskPassRequest {
2521 project_id,
2522 repository_id: repository_id.to_proto(),
2523 askpass_id,
2524 prompt,
2525 });
2526 cx.spawn(async move |_, _| {
2527 tx.send(response.await?.response).ok();
2528 anyhow::Ok(())
2529 })
2530 .detach_and_log_err(cx);
2531 })
2532 .log_err();
2533 })
2534}
2535
2536impl RepositoryId {
2537 pub fn to_proto(self) -> u64 {
2538 self.0
2539 }
2540
2541 pub fn from_proto(id: u64) -> Self {
2542 RepositoryId(id)
2543 }
2544}
2545
2546impl RepositorySnapshot {
2547 fn empty(id: RepositoryId, work_directory_abs_path: Arc<Path>) -> Self {
2548 Self {
2549 id,
2550 statuses_by_path: Default::default(),
2551 work_directory_abs_path,
2552 branch: None,
2553 head_commit: None,
2554 scan_id: 0,
2555 merge: Default::default(),
2556 }
2557 }
2558
2559 fn initial_update(&self, project_id: u64) -> proto::UpdateRepository {
2560 proto::UpdateRepository {
2561 branch_summary: self.branch.as_ref().map(branch_to_proto),
2562 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2563 updated_statuses: self
2564 .statuses_by_path
2565 .iter()
2566 .map(|entry| entry.to_proto())
2567 .collect(),
2568 removed_statuses: Default::default(),
2569 current_merge_conflicts: self
2570 .merge
2571 .conflicted_paths
2572 .iter()
2573 .map(|repo_path| repo_path.to_proto())
2574 .collect(),
2575 project_id,
2576 id: self.id.to_proto(),
2577 abs_path: self.work_directory_abs_path.to_proto(),
2578 entry_ids: vec![self.id.to_proto()],
2579 scan_id: self.scan_id,
2580 is_last_update: true,
2581 }
2582 }
2583
2584 fn build_update(&self, old: &Self, project_id: u64) -> proto::UpdateRepository {
2585 let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
2586 let mut removed_statuses: Vec<String> = Vec::new();
2587
2588 let mut new_statuses = self.statuses_by_path.iter().peekable();
2589 let mut old_statuses = old.statuses_by_path.iter().peekable();
2590
2591 let mut current_new_entry = new_statuses.next();
2592 let mut current_old_entry = old_statuses.next();
2593 loop {
2594 match (current_new_entry, current_old_entry) {
2595 (Some(new_entry), Some(old_entry)) => {
2596 match new_entry.repo_path.cmp(&old_entry.repo_path) {
2597 Ordering::Less => {
2598 updated_statuses.push(new_entry.to_proto());
2599 current_new_entry = new_statuses.next();
2600 }
2601 Ordering::Equal => {
2602 if new_entry.status != old_entry.status {
2603 updated_statuses.push(new_entry.to_proto());
2604 }
2605 current_old_entry = old_statuses.next();
2606 current_new_entry = new_statuses.next();
2607 }
2608 Ordering::Greater => {
2609 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2610 current_old_entry = old_statuses.next();
2611 }
2612 }
2613 }
2614 (None, Some(old_entry)) => {
2615 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2616 current_old_entry = old_statuses.next();
2617 }
2618 (Some(new_entry), None) => {
2619 updated_statuses.push(new_entry.to_proto());
2620 current_new_entry = new_statuses.next();
2621 }
2622 (None, None) => break,
2623 }
2624 }
2625
2626 proto::UpdateRepository {
2627 branch_summary: self.branch.as_ref().map(branch_to_proto),
2628 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2629 updated_statuses,
2630 removed_statuses,
2631 current_merge_conflicts: self
2632 .merge
2633 .conflicted_paths
2634 .iter()
2635 .map(|path| path.as_ref().to_proto())
2636 .collect(),
2637 project_id,
2638 id: self.id.to_proto(),
2639 abs_path: self.work_directory_abs_path.to_proto(),
2640 entry_ids: vec![],
2641 scan_id: self.scan_id,
2642 is_last_update: true,
2643 }
2644 }
2645
2646 pub fn status(&self) -> impl Iterator<Item = StatusEntry> + '_ {
2647 self.statuses_by_path.iter().cloned()
2648 }
2649
2650 pub fn status_summary(&self) -> GitSummary {
2651 self.statuses_by_path.summary().item_summary
2652 }
2653
2654 pub fn status_for_path(&self, path: &RepoPath) -> Option<StatusEntry> {
2655 self.statuses_by_path
2656 .get(&PathKey(path.0.clone()), &())
2657 .cloned()
2658 }
2659
2660 pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
2661 abs_path
2662 .strip_prefix(&self.work_directory_abs_path)
2663 .map(RepoPath::from)
2664 .ok()
2665 }
2666
2667 pub fn had_conflict_on_last_merge_head_change(&self, repo_path: &RepoPath) -> bool {
2668 self.merge.conflicted_paths.contains(&repo_path)
2669 }
2670
2671 pub fn has_conflict(&self, repo_path: &RepoPath) -> bool {
2672 let had_conflict_on_last_merge_head_change =
2673 self.merge.conflicted_paths.contains(&repo_path);
2674 let has_conflict_currently = self
2675 .status_for_path(&repo_path)
2676 .map_or(false, |entry| entry.status.is_conflicted());
2677 had_conflict_on_last_merge_head_change || has_conflict_currently
2678 }
2679
2680 /// This is the name that will be displayed in the repository selector for this repository.
2681 pub fn display_name(&self) -> SharedString {
2682 self.work_directory_abs_path
2683 .file_name()
2684 .unwrap_or_default()
2685 .to_string_lossy()
2686 .to_string()
2687 .into()
2688 }
2689}
2690
2691impl MergeDetails {
2692 async fn load(
2693 backend: &Arc<dyn GitRepository>,
2694 status: &SumTree<StatusEntry>,
2695 prev_snapshot: &RepositorySnapshot,
2696 ) -> Result<(MergeDetails, bool)> {
2697 log::debug!("load merge details");
2698 let message = backend.merge_message().await;
2699 let heads = backend
2700 .revparse_batch(vec![
2701 "MERGE_HEAD".into(),
2702 "CHERRY_PICK_HEAD".into(),
2703 "REBASE_HEAD".into(),
2704 "REVERT_HEAD".into(),
2705 "APPLY_HEAD".into(),
2706 ])
2707 .await
2708 .log_err()
2709 .unwrap_or_default()
2710 .into_iter()
2711 .map(|opt| opt.map(SharedString::from))
2712 .collect::<Vec<_>>();
2713 let merge_heads_changed = heads != prev_snapshot.merge.heads;
2714 let conflicted_paths = if merge_heads_changed {
2715 let current_conflicted_paths = TreeSet::from_ordered_entries(
2716 status
2717 .iter()
2718 .filter(|entry| entry.status.is_conflicted())
2719 .map(|entry| entry.repo_path.clone()),
2720 );
2721
2722 // It can happen that we run a scan while a lengthy merge is in progress
2723 // that will eventually result in conflicts, but before those conflicts
2724 // are reported by `git status`. Since for the moment we only care about
2725 // the merge heads state for the purposes of tracking conflicts, don't update
2726 // this state until we see some conflicts.
2727 if heads.iter().any(Option::is_some)
2728 && !prev_snapshot.merge.heads.iter().any(Option::is_some)
2729 && current_conflicted_paths.is_empty()
2730 {
2731 log::debug!("not updating merge heads because no conflicts found");
2732 return Ok((
2733 MergeDetails {
2734 message: message.map(SharedString::from),
2735 ..prev_snapshot.merge.clone()
2736 },
2737 false,
2738 ));
2739 }
2740
2741 current_conflicted_paths
2742 } else {
2743 prev_snapshot.merge.conflicted_paths.clone()
2744 };
2745 let details = MergeDetails {
2746 conflicted_paths,
2747 message: message.map(SharedString::from),
2748 heads,
2749 };
2750 Ok((details, merge_heads_changed))
2751 }
2752}
2753
2754impl Repository {
2755 pub fn snapshot(&self) -> RepositorySnapshot {
2756 self.snapshot.clone()
2757 }
2758
2759 fn local(
2760 id: RepositoryId,
2761 work_directory_abs_path: Arc<Path>,
2762 dot_git_abs_path: Arc<Path>,
2763 repository_dir_abs_path: Arc<Path>,
2764 common_dir_abs_path: Arc<Path>,
2765 project_environment: WeakEntity<ProjectEnvironment>,
2766 fs: Arc<dyn Fs>,
2767 git_store: WeakEntity<GitStore>,
2768 cx: &mut Context<Self>,
2769 ) -> Self {
2770 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path.clone());
2771 Repository {
2772 this: cx.weak_entity(),
2773 git_store,
2774 snapshot,
2775 commit_message_buffer: None,
2776 askpass_delegates: Default::default(),
2777 paths_needing_status_update: Default::default(),
2778 latest_askpass_id: 0,
2779 job_sender: Repository::spawn_local_git_worker(
2780 work_directory_abs_path,
2781 dot_git_abs_path,
2782 repository_dir_abs_path,
2783 common_dir_abs_path,
2784 project_environment,
2785 fs,
2786 cx,
2787 ),
2788 job_id: 0,
2789 active_jobs: Default::default(),
2790 }
2791 }
2792
2793 fn remote(
2794 id: RepositoryId,
2795 work_directory_abs_path: Arc<Path>,
2796 project_id: ProjectId,
2797 client: AnyProtoClient,
2798 git_store: WeakEntity<GitStore>,
2799 cx: &mut Context<Self>,
2800 ) -> Self {
2801 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path);
2802 Self {
2803 this: cx.weak_entity(),
2804 snapshot,
2805 commit_message_buffer: None,
2806 git_store,
2807 paths_needing_status_update: Default::default(),
2808 job_sender: Self::spawn_remote_git_worker(project_id, client, cx),
2809 askpass_delegates: Default::default(),
2810 latest_askpass_id: 0,
2811 active_jobs: Default::default(),
2812 job_id: 0,
2813 }
2814 }
2815
2816 pub fn git_store(&self) -> Option<Entity<GitStore>> {
2817 self.git_store.upgrade()
2818 }
2819
2820 fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
2821 let this = cx.weak_entity();
2822 let git_store = self.git_store.clone();
2823 let _ = self.send_keyed_job(
2824 Some(GitJobKey::ReloadBufferDiffBases),
2825 None,
2826 |state, mut cx| async move {
2827 let RepositoryState::Local { backend, .. } = state else {
2828 log::error!("tried to recompute diffs for a non-local repository");
2829 return Ok(());
2830 };
2831
2832 let Some(this) = this.upgrade() else {
2833 return Ok(());
2834 };
2835
2836 let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
2837 git_store.update(cx, |git_store, cx| {
2838 git_store
2839 .diffs
2840 .iter()
2841 .filter_map(|(buffer_id, diff_state)| {
2842 let buffer_store = git_store.buffer_store.read(cx);
2843 let buffer = buffer_store.get(*buffer_id)?;
2844 let file = File::from_dyn(buffer.read(cx).file())?;
2845 let abs_path =
2846 file.worktree.read(cx).absolutize(&file.path).ok()?;
2847 let repo_path = this.abs_path_to_repo_path(&abs_path)?;
2848 log::debug!(
2849 "start reload diff bases for repo path {}",
2850 repo_path.0.display()
2851 );
2852 diff_state.update(cx, |diff_state, _| {
2853 let has_unstaged_diff = diff_state
2854 .unstaged_diff
2855 .as_ref()
2856 .is_some_and(|diff| diff.is_upgradable());
2857 let has_uncommitted_diff = diff_state
2858 .uncommitted_diff
2859 .as_ref()
2860 .is_some_and(|set| set.is_upgradable());
2861
2862 Some((
2863 buffer,
2864 repo_path,
2865 has_unstaged_diff.then(|| diff_state.index_text.clone()),
2866 has_uncommitted_diff.then(|| diff_state.head_text.clone()),
2867 ))
2868 })
2869 })
2870 .collect::<Vec<_>>()
2871 })
2872 })??;
2873
2874 let buffer_diff_base_changes = cx
2875 .background_spawn(async move {
2876 let mut changes = Vec::new();
2877 for (buffer, repo_path, current_index_text, current_head_text) in
2878 &repo_diff_state_updates
2879 {
2880 let index_text = if current_index_text.is_some() {
2881 backend.load_index_text(repo_path.clone()).await
2882 } else {
2883 None
2884 };
2885 let head_text = if current_head_text.is_some() {
2886 backend.load_committed_text(repo_path.clone()).await
2887 } else {
2888 None
2889 };
2890
2891 let change =
2892 match (current_index_text.as_ref(), current_head_text.as_ref()) {
2893 (Some(current_index), Some(current_head)) => {
2894 let index_changed =
2895 index_text.as_ref() != current_index.as_deref();
2896 let head_changed =
2897 head_text.as_ref() != current_head.as_deref();
2898 if index_changed && head_changed {
2899 if index_text == head_text {
2900 Some(DiffBasesChange::SetBoth(head_text))
2901 } else {
2902 Some(DiffBasesChange::SetEach {
2903 index: index_text,
2904 head: head_text,
2905 })
2906 }
2907 } else if index_changed {
2908 Some(DiffBasesChange::SetIndex(index_text))
2909 } else if head_changed {
2910 Some(DiffBasesChange::SetHead(head_text))
2911 } else {
2912 None
2913 }
2914 }
2915 (Some(current_index), None) => {
2916 let index_changed =
2917 index_text.as_ref() != current_index.as_deref();
2918 index_changed
2919 .then_some(DiffBasesChange::SetIndex(index_text))
2920 }
2921 (None, Some(current_head)) => {
2922 let head_changed =
2923 head_text.as_ref() != current_head.as_deref();
2924 head_changed.then_some(DiffBasesChange::SetHead(head_text))
2925 }
2926 (None, None) => None,
2927 };
2928
2929 changes.push((buffer.clone(), change))
2930 }
2931 changes
2932 })
2933 .await;
2934
2935 git_store.update(&mut cx, |git_store, cx| {
2936 for (buffer, diff_bases_change) in buffer_diff_base_changes {
2937 let buffer_snapshot = buffer.read(cx).text_snapshot();
2938 let buffer_id = buffer_snapshot.remote_id();
2939 let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
2940 continue;
2941 };
2942
2943 let downstream_client = git_store.downstream_client();
2944 diff_state.update(cx, |diff_state, cx| {
2945 use proto::update_diff_bases::Mode;
2946
2947 if let Some((diff_bases_change, (client, project_id))) =
2948 diff_bases_change.clone().zip(downstream_client)
2949 {
2950 let (staged_text, committed_text, mode) = match diff_bases_change {
2951 DiffBasesChange::SetIndex(index) => {
2952 (index, None, Mode::IndexOnly)
2953 }
2954 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
2955 DiffBasesChange::SetEach { index, head } => {
2956 (index, head, Mode::IndexAndHead)
2957 }
2958 DiffBasesChange::SetBoth(text) => {
2959 (None, text, Mode::IndexMatchesHead)
2960 }
2961 };
2962 client
2963 .send(proto::UpdateDiffBases {
2964 project_id: project_id.to_proto(),
2965 buffer_id: buffer_id.to_proto(),
2966 staged_text,
2967 committed_text,
2968 mode: mode as i32,
2969 })
2970 .log_err();
2971 }
2972
2973 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
2974 });
2975 }
2976 })
2977 },
2978 );
2979 }
2980
2981 pub fn send_job<F, Fut, R>(
2982 &mut self,
2983 status: Option<SharedString>,
2984 job: F,
2985 ) -> oneshot::Receiver<R>
2986 where
2987 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
2988 Fut: Future<Output = R> + 'static,
2989 R: Send + 'static,
2990 {
2991 self.send_keyed_job(None, status, job)
2992 }
2993
2994 fn send_keyed_job<F, Fut, R>(
2995 &mut self,
2996 key: Option<GitJobKey>,
2997 status: Option<SharedString>,
2998 job: F,
2999 ) -> oneshot::Receiver<R>
3000 where
3001 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3002 Fut: Future<Output = R> + 'static,
3003 R: Send + 'static,
3004 {
3005 let (result_tx, result_rx) = futures::channel::oneshot::channel();
3006 let job_id = post_inc(&mut self.job_id);
3007 let this = self.this.clone();
3008 self.job_sender
3009 .unbounded_send(GitJob {
3010 key,
3011 job: Box::new(move |state, cx: &mut AsyncApp| {
3012 let job = job(state, cx.clone());
3013 cx.spawn(async move |cx| {
3014 if let Some(s) = status.clone() {
3015 this.update(cx, |this, cx| {
3016 this.active_jobs.insert(
3017 job_id,
3018 JobInfo {
3019 start: Instant::now(),
3020 message: s.clone(),
3021 },
3022 );
3023
3024 cx.notify();
3025 })
3026 .ok();
3027 }
3028 let result = job.await;
3029
3030 this.update(cx, |this, cx| {
3031 this.active_jobs.remove(&job_id);
3032 cx.notify();
3033 })
3034 .ok();
3035
3036 result_tx.send(result).ok();
3037 })
3038 }),
3039 })
3040 .ok();
3041 result_rx
3042 }
3043
3044 pub fn set_as_active_repository(&self, cx: &mut Context<Self>) {
3045 let Some(git_store) = self.git_store.upgrade() else {
3046 return;
3047 };
3048 let entity = cx.entity();
3049 git_store.update(cx, |git_store, cx| {
3050 let Some((&id, _)) = git_store
3051 .repositories
3052 .iter()
3053 .find(|(_, handle)| *handle == &entity)
3054 else {
3055 return;
3056 };
3057 git_store.active_repo_id = Some(id);
3058 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
3059 });
3060 }
3061
3062 pub fn cached_status(&self) -> impl '_ + Iterator<Item = StatusEntry> {
3063 self.snapshot.status()
3064 }
3065
3066 pub fn repo_path_to_project_path(&self, path: &RepoPath, cx: &App) -> Option<ProjectPath> {
3067 let git_store = self.git_store.upgrade()?;
3068 let worktree_store = git_store.read(cx).worktree_store.read(cx);
3069 let abs_path = self.snapshot.work_directory_abs_path.join(&path.0);
3070 let (worktree, relative_path) = worktree_store.find_worktree(abs_path, cx)?;
3071 Some(ProjectPath {
3072 worktree_id: worktree.read(cx).id(),
3073 path: relative_path.into(),
3074 })
3075 }
3076
3077 pub fn project_path_to_repo_path(&self, path: &ProjectPath, cx: &App) -> Option<RepoPath> {
3078 let git_store = self.git_store.upgrade()?;
3079 let worktree_store = git_store.read(cx).worktree_store.read(cx);
3080 let abs_path = worktree_store.absolutize(path, cx)?;
3081 self.snapshot.abs_path_to_repo_path(&abs_path)
3082 }
3083
3084 pub fn contains_sub_repo(&self, other: &Entity<Self>, cx: &App) -> bool {
3085 other
3086 .read(cx)
3087 .snapshot
3088 .work_directory_abs_path
3089 .starts_with(&self.snapshot.work_directory_abs_path)
3090 }
3091
3092 pub fn open_commit_buffer(
3093 &mut self,
3094 languages: Option<Arc<LanguageRegistry>>,
3095 buffer_store: Entity<BufferStore>,
3096 cx: &mut Context<Self>,
3097 ) -> Task<Result<Entity<Buffer>>> {
3098 let id = self.id;
3099 if let Some(buffer) = self.commit_message_buffer.clone() {
3100 return Task::ready(Ok(buffer));
3101 }
3102 let this = cx.weak_entity();
3103
3104 let rx = self.send_job(None, move |state, mut cx| async move {
3105 let Some(this) = this.upgrade() else {
3106 bail!("git store was dropped");
3107 };
3108 match state {
3109 RepositoryState::Local { .. } => {
3110 this.update(&mut cx, |_, cx| {
3111 Self::open_local_commit_buffer(languages, buffer_store, cx)
3112 })?
3113 .await
3114 }
3115 RepositoryState::Remote { project_id, client } => {
3116 let request = client.request(proto::OpenCommitMessageBuffer {
3117 project_id: project_id.0,
3118 repository_id: id.to_proto(),
3119 });
3120 let response = request.await.context("requesting to open commit buffer")?;
3121 let buffer_id = BufferId::new(response.buffer_id)?;
3122 let buffer = buffer_store
3123 .update(&mut cx, |buffer_store, cx| {
3124 buffer_store.wait_for_remote_buffer(buffer_id, cx)
3125 })?
3126 .await?;
3127 if let Some(language_registry) = languages {
3128 let git_commit_language =
3129 language_registry.language_for_name("Git Commit").await?;
3130 buffer.update(&mut cx, |buffer, cx| {
3131 buffer.set_language(Some(git_commit_language), cx);
3132 })?;
3133 }
3134 this.update(&mut cx, |this, _| {
3135 this.commit_message_buffer = Some(buffer.clone());
3136 })?;
3137 Ok(buffer)
3138 }
3139 }
3140 });
3141
3142 cx.spawn(|_, _: &mut AsyncApp| async move { rx.await? })
3143 }
3144
3145 fn open_local_commit_buffer(
3146 language_registry: Option<Arc<LanguageRegistry>>,
3147 buffer_store: Entity<BufferStore>,
3148 cx: &mut Context<Self>,
3149 ) -> Task<Result<Entity<Buffer>>> {
3150 cx.spawn(async move |repository, cx| {
3151 let buffer = buffer_store
3152 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx))?
3153 .await?;
3154
3155 if let Some(language_registry) = language_registry {
3156 let git_commit_language = language_registry.language_for_name("Git Commit").await?;
3157 buffer.update(cx, |buffer, cx| {
3158 buffer.set_language(Some(git_commit_language), cx);
3159 })?;
3160 }
3161
3162 repository.update(cx, |repository, _| {
3163 repository.commit_message_buffer = Some(buffer.clone());
3164 })?;
3165 Ok(buffer)
3166 })
3167 }
3168
3169 pub fn checkout_files(
3170 &mut self,
3171 commit: &str,
3172 paths: Vec<RepoPath>,
3173 _cx: &mut App,
3174 ) -> oneshot::Receiver<Result<()>> {
3175 let commit = commit.to_string();
3176 let id = self.id;
3177
3178 self.send_job(
3179 Some(format!("git checkout {}", commit).into()),
3180 move |git_repo, _| async move {
3181 match git_repo {
3182 RepositoryState::Local {
3183 backend,
3184 environment,
3185 ..
3186 } => {
3187 backend
3188 .checkout_files(commit, paths, environment.clone())
3189 .await
3190 }
3191 RepositoryState::Remote { project_id, client } => {
3192 client
3193 .request(proto::GitCheckoutFiles {
3194 project_id: project_id.0,
3195 repository_id: id.to_proto(),
3196 commit,
3197 paths: paths
3198 .into_iter()
3199 .map(|p| p.to_string_lossy().to_string())
3200 .collect(),
3201 })
3202 .await?;
3203
3204 Ok(())
3205 }
3206 }
3207 },
3208 )
3209 }
3210
3211 pub fn reset(
3212 &mut self,
3213 commit: String,
3214 reset_mode: ResetMode,
3215 _cx: &mut App,
3216 ) -> oneshot::Receiver<Result<()>> {
3217 let commit = commit.to_string();
3218 let id = self.id;
3219
3220 self.send_job(None, move |git_repo, _| async move {
3221 match git_repo {
3222 RepositoryState::Local {
3223 backend,
3224 environment,
3225 ..
3226 } => backend.reset(commit, reset_mode, environment).await,
3227 RepositoryState::Remote { project_id, client } => {
3228 client
3229 .request(proto::GitReset {
3230 project_id: project_id.0,
3231 repository_id: id.to_proto(),
3232 commit,
3233 mode: match reset_mode {
3234 ResetMode::Soft => git_reset::ResetMode::Soft.into(),
3235 ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
3236 },
3237 })
3238 .await?;
3239
3240 Ok(())
3241 }
3242 }
3243 })
3244 }
3245
3246 pub fn show(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDetails>> {
3247 let id = self.id;
3248 self.send_job(None, move |git_repo, _cx| async move {
3249 match git_repo {
3250 RepositoryState::Local { backend, .. } => backend.show(commit).await,
3251 RepositoryState::Remote { project_id, client } => {
3252 let resp = client
3253 .request(proto::GitShow {
3254 project_id: project_id.0,
3255 repository_id: id.to_proto(),
3256 commit,
3257 })
3258 .await?;
3259
3260 Ok(CommitDetails {
3261 sha: resp.sha.into(),
3262 message: resp.message.into(),
3263 commit_timestamp: resp.commit_timestamp,
3264 author_email: resp.author_email.into(),
3265 author_name: resp.author_name.into(),
3266 })
3267 }
3268 }
3269 })
3270 }
3271
3272 pub fn load_commit_diff(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
3273 let id = self.id;
3274 self.send_job(None, move |git_repo, cx| async move {
3275 match git_repo {
3276 RepositoryState::Local { backend, .. } => backend.load_commit(commit, cx).await,
3277 RepositoryState::Remote {
3278 client, project_id, ..
3279 } => {
3280 let response = client
3281 .request(proto::LoadCommitDiff {
3282 project_id: project_id.0,
3283 repository_id: id.to_proto(),
3284 commit,
3285 })
3286 .await?;
3287 Ok(CommitDiff {
3288 files: response
3289 .files
3290 .into_iter()
3291 .map(|file| CommitFile {
3292 path: Path::new(&file.path).into(),
3293 old_text: file.old_text,
3294 new_text: file.new_text,
3295 })
3296 .collect(),
3297 })
3298 }
3299 }
3300 })
3301 }
3302
3303 fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
3304 Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
3305 }
3306
3307 pub fn stage_entries(
3308 &self,
3309 entries: Vec<RepoPath>,
3310 cx: &mut Context<Self>,
3311 ) -> Task<anyhow::Result<()>> {
3312 if entries.is_empty() {
3313 return Task::ready(Ok(()));
3314 }
3315 let id = self.id;
3316
3317 let mut save_futures = Vec::new();
3318 if let Some(buffer_store) = self.buffer_store(cx) {
3319 buffer_store.update(cx, |buffer_store, cx| {
3320 for path in &entries {
3321 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3322 continue;
3323 };
3324 if let Some(buffer) = buffer_store.get_by_path(&project_path, cx) {
3325 if buffer
3326 .read(cx)
3327 .file()
3328 .map_or(false, |file| file.disk_state().exists())
3329 {
3330 save_futures.push(buffer_store.save_buffer(buffer, cx));
3331 }
3332 }
3333 }
3334 })
3335 }
3336
3337 cx.spawn(async move |this, cx| {
3338 for save_future in save_futures {
3339 save_future.await?;
3340 }
3341
3342 this.update(cx, |this, _| {
3343 this.send_job(None, move |git_repo, _cx| async move {
3344 match git_repo {
3345 RepositoryState::Local {
3346 backend,
3347 environment,
3348 ..
3349 } => backend.stage_paths(entries, environment.clone()).await,
3350 RepositoryState::Remote { project_id, client } => {
3351 client
3352 .request(proto::Stage {
3353 project_id: project_id.0,
3354 repository_id: id.to_proto(),
3355 paths: entries
3356 .into_iter()
3357 .map(|repo_path| repo_path.as_ref().to_proto())
3358 .collect(),
3359 })
3360 .await
3361 .context("sending stage request")?;
3362
3363 Ok(())
3364 }
3365 }
3366 })
3367 })?
3368 .await??;
3369
3370 Ok(())
3371 })
3372 }
3373
3374 pub fn unstage_entries(
3375 &self,
3376 entries: Vec<RepoPath>,
3377 cx: &mut Context<Self>,
3378 ) -> Task<anyhow::Result<()>> {
3379 if entries.is_empty() {
3380 return Task::ready(Ok(()));
3381 }
3382 let id = self.id;
3383
3384 let mut save_futures = Vec::new();
3385 if let Some(buffer_store) = self.buffer_store(cx) {
3386 buffer_store.update(cx, |buffer_store, cx| {
3387 for path in &entries {
3388 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3389 continue;
3390 };
3391 if let Some(buffer) = buffer_store.get_by_path(&project_path, cx) {
3392 if buffer
3393 .read(cx)
3394 .file()
3395 .map_or(false, |file| file.disk_state().exists())
3396 {
3397 save_futures.push(buffer_store.save_buffer(buffer, cx));
3398 }
3399 }
3400 }
3401 })
3402 }
3403
3404 cx.spawn(async move |this, cx| {
3405 for save_future in save_futures {
3406 save_future.await?;
3407 }
3408
3409 this.update(cx, |this, _| {
3410 this.send_job(None, move |git_repo, _cx| async move {
3411 match git_repo {
3412 RepositoryState::Local {
3413 backend,
3414 environment,
3415 ..
3416 } => backend.unstage_paths(entries, environment).await,
3417 RepositoryState::Remote { project_id, client } => {
3418 client
3419 .request(proto::Unstage {
3420 project_id: project_id.0,
3421 repository_id: id.to_proto(),
3422 paths: entries
3423 .into_iter()
3424 .map(|repo_path| repo_path.as_ref().to_proto())
3425 .collect(),
3426 })
3427 .await
3428 .context("sending unstage request")?;
3429
3430 Ok(())
3431 }
3432 }
3433 })
3434 })?
3435 .await??;
3436
3437 Ok(())
3438 })
3439 }
3440
3441 pub fn stage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3442 let to_stage = self
3443 .cached_status()
3444 .filter(|entry| !entry.status.staging().is_fully_staged())
3445 .map(|entry| entry.repo_path.clone())
3446 .collect();
3447 self.stage_entries(to_stage, cx)
3448 }
3449
3450 pub fn unstage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3451 let to_unstage = self
3452 .cached_status()
3453 .filter(|entry| entry.status.staging().has_staged())
3454 .map(|entry| entry.repo_path.clone())
3455 .collect();
3456 self.unstage_entries(to_unstage, cx)
3457 }
3458
3459 pub fn commit(
3460 &mut self,
3461 message: SharedString,
3462 name_and_email: Option<(SharedString, SharedString)>,
3463 options: CommitOptions,
3464 _cx: &mut App,
3465 ) -> oneshot::Receiver<Result<()>> {
3466 let id = self.id;
3467
3468 self.send_job(Some("git commit".into()), move |git_repo, _cx| async move {
3469 match git_repo {
3470 RepositoryState::Local {
3471 backend,
3472 environment,
3473 ..
3474 } => {
3475 backend
3476 .commit(message, name_and_email, options, environment)
3477 .await
3478 }
3479 RepositoryState::Remote { project_id, client } => {
3480 let (name, email) = name_and_email.unzip();
3481 client
3482 .request(proto::Commit {
3483 project_id: project_id.0,
3484 repository_id: id.to_proto(),
3485 message: String::from(message),
3486 name: name.map(String::from),
3487 email: email.map(String::from),
3488 options: Some(proto::commit::CommitOptions {
3489 amend: options.amend,
3490 }),
3491 })
3492 .await
3493 .context("sending commit request")?;
3494
3495 Ok(())
3496 }
3497 }
3498 })
3499 }
3500
3501 pub fn fetch(
3502 &mut self,
3503 askpass: AskPassDelegate,
3504 _cx: &mut App,
3505 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3506 let askpass_delegates = self.askpass_delegates.clone();
3507 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3508 let id = self.id;
3509
3510 self.send_job(Some("git fetch".into()), move |git_repo, cx| async move {
3511 match git_repo {
3512 RepositoryState::Local {
3513 backend,
3514 environment,
3515 ..
3516 } => backend.fetch(askpass, environment, cx).await,
3517 RepositoryState::Remote { project_id, client } => {
3518 askpass_delegates.lock().insert(askpass_id, askpass);
3519 let _defer = util::defer(|| {
3520 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3521 debug_assert!(askpass_delegate.is_some());
3522 });
3523
3524 let response = client
3525 .request(proto::Fetch {
3526 project_id: project_id.0,
3527 repository_id: id.to_proto(),
3528 askpass_id,
3529 })
3530 .await
3531 .context("sending fetch request")?;
3532
3533 Ok(RemoteCommandOutput {
3534 stdout: response.stdout,
3535 stderr: response.stderr,
3536 })
3537 }
3538 }
3539 })
3540 }
3541
3542 pub fn push(
3543 &mut self,
3544 branch: SharedString,
3545 remote: SharedString,
3546 options: Option<PushOptions>,
3547 askpass: AskPassDelegate,
3548 cx: &mut Context<Self>,
3549 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3550 let askpass_delegates = self.askpass_delegates.clone();
3551 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3552 let id = self.id;
3553
3554 let args = options
3555 .map(|option| match option {
3556 PushOptions::SetUpstream => " --set-upstream",
3557 PushOptions::Force => " --force-with-lease",
3558 })
3559 .unwrap_or("");
3560
3561 let updates_tx = self
3562 .git_store()
3563 .and_then(|git_store| match &git_store.read(cx).state {
3564 GitStoreState::Local { downstream, .. } => downstream
3565 .as_ref()
3566 .map(|downstream| downstream.updates_tx.clone()),
3567 _ => None,
3568 });
3569
3570 let this = cx.weak_entity();
3571 self.send_job(
3572 Some(format!("git push {} {} {}", args, branch, remote).into()),
3573 move |git_repo, mut cx| async move {
3574 match git_repo {
3575 RepositoryState::Local {
3576 backend,
3577 environment,
3578 ..
3579 } => {
3580 let result = backend
3581 .push(
3582 branch.to_string(),
3583 remote.to_string(),
3584 options,
3585 askpass,
3586 environment.clone(),
3587 cx.clone(),
3588 )
3589 .await;
3590 if result.is_ok() {
3591 let branches = backend.branches().await?;
3592 let branch = branches.into_iter().find(|branch| branch.is_head);
3593 log::info!("head branch after scan is {branch:?}");
3594 let snapshot = this.update(&mut cx, |this, cx| {
3595 this.snapshot.branch = branch;
3596 let snapshot = this.snapshot.clone();
3597 cx.emit(RepositoryEvent::Updated { full_scan: false });
3598 snapshot
3599 })?;
3600 if let Some(updates_tx) = updates_tx {
3601 updates_tx
3602 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
3603 .ok();
3604 }
3605 }
3606 result
3607 }
3608 RepositoryState::Remote { project_id, client } => {
3609 askpass_delegates.lock().insert(askpass_id, askpass);
3610 let _defer = util::defer(|| {
3611 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3612 debug_assert!(askpass_delegate.is_some());
3613 });
3614 let response = client
3615 .request(proto::Push {
3616 project_id: project_id.0,
3617 repository_id: id.to_proto(),
3618 askpass_id,
3619 branch_name: branch.to_string(),
3620 remote_name: remote.to_string(),
3621 options: options.map(|options| match options {
3622 PushOptions::Force => proto::push::PushOptions::Force,
3623 PushOptions::SetUpstream => {
3624 proto::push::PushOptions::SetUpstream
3625 }
3626 }
3627 as i32),
3628 })
3629 .await
3630 .context("sending push request")?;
3631
3632 Ok(RemoteCommandOutput {
3633 stdout: response.stdout,
3634 stderr: response.stderr,
3635 })
3636 }
3637 }
3638 },
3639 )
3640 }
3641
3642 pub fn pull(
3643 &mut self,
3644 branch: SharedString,
3645 remote: SharedString,
3646 askpass: AskPassDelegate,
3647 _cx: &mut App,
3648 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3649 let askpass_delegates = self.askpass_delegates.clone();
3650 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3651 let id = self.id;
3652
3653 self.send_job(
3654 Some(format!("git pull {} {}", remote, branch).into()),
3655 move |git_repo, cx| async move {
3656 match git_repo {
3657 RepositoryState::Local {
3658 backend,
3659 environment,
3660 ..
3661 } => {
3662 backend
3663 .pull(
3664 branch.to_string(),
3665 remote.to_string(),
3666 askpass,
3667 environment.clone(),
3668 cx,
3669 )
3670 .await
3671 }
3672 RepositoryState::Remote { project_id, client } => {
3673 askpass_delegates.lock().insert(askpass_id, askpass);
3674 let _defer = util::defer(|| {
3675 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3676 debug_assert!(askpass_delegate.is_some());
3677 });
3678 let response = client
3679 .request(proto::Pull {
3680 project_id: project_id.0,
3681 repository_id: id.to_proto(),
3682 askpass_id,
3683 branch_name: branch.to_string(),
3684 remote_name: remote.to_string(),
3685 })
3686 .await
3687 .context("sending pull request")?;
3688
3689 Ok(RemoteCommandOutput {
3690 stdout: response.stdout,
3691 stderr: response.stderr,
3692 })
3693 }
3694 }
3695 },
3696 )
3697 }
3698
3699 fn spawn_set_index_text_job(
3700 &mut self,
3701 path: RepoPath,
3702 content: Option<String>,
3703 hunk_staging_operation_count: Option<usize>,
3704 cx: &mut Context<Self>,
3705 ) -> oneshot::Receiver<anyhow::Result<()>> {
3706 let id = self.id;
3707 let this = cx.weak_entity();
3708 let git_store = self.git_store.clone();
3709 self.send_keyed_job(
3710 Some(GitJobKey::WriteIndex(path.clone())),
3711 None,
3712 move |git_repo, mut cx| async move {
3713 log::debug!("start updating index text for buffer {}", path.display());
3714 match git_repo {
3715 RepositoryState::Local {
3716 backend,
3717 environment,
3718 ..
3719 } => {
3720 backend
3721 .set_index_text(path.clone(), content, environment.clone())
3722 .await?;
3723 }
3724 RepositoryState::Remote { project_id, client } => {
3725 client
3726 .request(proto::SetIndexText {
3727 project_id: project_id.0,
3728 repository_id: id.to_proto(),
3729 path: path.as_ref().to_proto(),
3730 text: content,
3731 })
3732 .await?;
3733 }
3734 }
3735 log::debug!("finish updating index text for buffer {}", path.display());
3736
3737 if let Some(hunk_staging_operation_count) = hunk_staging_operation_count {
3738 let project_path = this
3739 .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
3740 .ok()
3741 .flatten();
3742 git_store.update(&mut cx, |git_store, cx| {
3743 let buffer_id = git_store
3744 .buffer_store
3745 .read(cx)
3746 .get_by_path(&project_path?, cx)?
3747 .read(cx)
3748 .remote_id();
3749 let diff_state = git_store.diffs.get(&buffer_id)?;
3750 diff_state.update(cx, |diff_state, _| {
3751 diff_state.hunk_staging_operation_count_as_of_write =
3752 hunk_staging_operation_count;
3753 });
3754 Some(())
3755 })?;
3756 }
3757 Ok(())
3758 },
3759 )
3760 }
3761
3762 pub fn get_remotes(
3763 &mut self,
3764 branch_name: Option<String>,
3765 ) -> oneshot::Receiver<Result<Vec<Remote>>> {
3766 let id = self.id;
3767 self.send_job(None, move |repo, _cx| async move {
3768 match repo {
3769 RepositoryState::Local { backend, .. } => backend.get_remotes(branch_name).await,
3770 RepositoryState::Remote { project_id, client } => {
3771 let response = client
3772 .request(proto::GetRemotes {
3773 project_id: project_id.0,
3774 repository_id: id.to_proto(),
3775 branch_name,
3776 })
3777 .await?;
3778
3779 let remotes = response
3780 .remotes
3781 .into_iter()
3782 .map(|remotes| git::repository::Remote {
3783 name: remotes.name.into(),
3784 })
3785 .collect();
3786
3787 Ok(remotes)
3788 }
3789 }
3790 })
3791 }
3792
3793 pub fn branches(&mut self) -> oneshot::Receiver<Result<Vec<Branch>>> {
3794 let id = self.id;
3795 self.send_job(None, move |repo, _| async move {
3796 match repo {
3797 RepositoryState::Local { backend, .. } => backend.branches().await,
3798 RepositoryState::Remote { project_id, client } => {
3799 let response = client
3800 .request(proto::GitGetBranches {
3801 project_id: project_id.0,
3802 repository_id: id.to_proto(),
3803 })
3804 .await?;
3805
3806 let branches = response
3807 .branches
3808 .into_iter()
3809 .map(|branch| proto_to_branch(&branch))
3810 .collect();
3811
3812 Ok(branches)
3813 }
3814 }
3815 })
3816 }
3817
3818 pub fn diff(&mut self, diff_type: DiffType, _cx: &App) -> oneshot::Receiver<Result<String>> {
3819 let id = self.id;
3820 self.send_job(None, move |repo, _cx| async move {
3821 match repo {
3822 RepositoryState::Local { backend, .. } => backend.diff(diff_type).await,
3823 RepositoryState::Remote { project_id, client } => {
3824 let response = client
3825 .request(proto::GitDiff {
3826 project_id: project_id.0,
3827 repository_id: id.to_proto(),
3828 diff_type: match diff_type {
3829 DiffType::HeadToIndex => {
3830 proto::git_diff::DiffType::HeadToIndex.into()
3831 }
3832 DiffType::HeadToWorktree => {
3833 proto::git_diff::DiffType::HeadToWorktree.into()
3834 }
3835 },
3836 })
3837 .await?;
3838
3839 Ok(response.diff)
3840 }
3841 }
3842 })
3843 }
3844
3845 pub fn create_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3846 let id = self.id;
3847 self.send_job(
3848 Some(format!("git switch -c {branch_name}").into()),
3849 move |repo, _cx| async move {
3850 match repo {
3851 RepositoryState::Local { backend, .. } => {
3852 backend.create_branch(branch_name).await
3853 }
3854 RepositoryState::Remote { project_id, client } => {
3855 client
3856 .request(proto::GitCreateBranch {
3857 project_id: project_id.0,
3858 repository_id: id.to_proto(),
3859 branch_name,
3860 })
3861 .await?;
3862
3863 Ok(())
3864 }
3865 }
3866 },
3867 )
3868 }
3869
3870 pub fn change_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3871 let id = self.id;
3872 self.send_job(
3873 Some(format!("git switch {branch_name}").into()),
3874 move |repo, _cx| async move {
3875 match repo {
3876 RepositoryState::Local { backend, .. } => {
3877 backend.change_branch(branch_name).await
3878 }
3879 RepositoryState::Remote { project_id, client } => {
3880 client
3881 .request(proto::GitChangeBranch {
3882 project_id: project_id.0,
3883 repository_id: id.to_proto(),
3884 branch_name,
3885 })
3886 .await?;
3887
3888 Ok(())
3889 }
3890 }
3891 },
3892 )
3893 }
3894
3895 pub fn check_for_pushed_commits(&mut self) -> oneshot::Receiver<Result<Vec<SharedString>>> {
3896 let id = self.id;
3897 self.send_job(None, move |repo, _cx| async move {
3898 match repo {
3899 RepositoryState::Local { backend, .. } => backend.check_for_pushed_commit().await,
3900 RepositoryState::Remote { project_id, client } => {
3901 let response = client
3902 .request(proto::CheckForPushedCommits {
3903 project_id: project_id.0,
3904 repository_id: id.to_proto(),
3905 })
3906 .await?;
3907
3908 let branches = response.pushed_to.into_iter().map(Into::into).collect();
3909
3910 Ok(branches)
3911 }
3912 }
3913 })
3914 }
3915
3916 pub fn checkpoint(&mut self) -> oneshot::Receiver<Result<GitRepositoryCheckpoint>> {
3917 self.send_job(None, |repo, _cx| async move {
3918 match repo {
3919 RepositoryState::Local { backend, .. } => backend.checkpoint().await,
3920 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
3921 }
3922 })
3923 }
3924
3925 pub fn restore_checkpoint(
3926 &mut self,
3927 checkpoint: GitRepositoryCheckpoint,
3928 ) -> oneshot::Receiver<Result<()>> {
3929 self.send_job(None, move |repo, _cx| async move {
3930 match repo {
3931 RepositoryState::Local { backend, .. } => {
3932 backend.restore_checkpoint(checkpoint).await
3933 }
3934 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
3935 }
3936 })
3937 }
3938
3939 pub(crate) fn apply_remote_update(
3940 &mut self,
3941 update: proto::UpdateRepository,
3942 cx: &mut Context<Self>,
3943 ) -> Result<()> {
3944 let conflicted_paths = TreeSet::from_ordered_entries(
3945 update
3946 .current_merge_conflicts
3947 .into_iter()
3948 .map(|path| RepoPath(Path::new(&path).into())),
3949 );
3950 self.snapshot.branch = update.branch_summary.as_ref().map(proto_to_branch);
3951 self.snapshot.head_commit = update
3952 .head_commit_details
3953 .as_ref()
3954 .map(proto_to_commit_details);
3955
3956 self.snapshot.merge.conflicted_paths = conflicted_paths;
3957
3958 let edits = update
3959 .removed_statuses
3960 .into_iter()
3961 .map(|path| sum_tree::Edit::Remove(PathKey(FromProto::from_proto(path))))
3962 .chain(
3963 update
3964 .updated_statuses
3965 .into_iter()
3966 .filter_map(|updated_status| {
3967 Some(sum_tree::Edit::Insert(updated_status.try_into().log_err()?))
3968 }),
3969 )
3970 .collect::<Vec<_>>();
3971 self.snapshot.statuses_by_path.edit(edits, &());
3972 if update.is_last_update {
3973 self.snapshot.scan_id = update.scan_id;
3974 }
3975 cx.emit(RepositoryEvent::Updated { full_scan: true });
3976 Ok(())
3977 }
3978
3979 pub fn compare_checkpoints(
3980 &mut self,
3981 left: GitRepositoryCheckpoint,
3982 right: GitRepositoryCheckpoint,
3983 ) -> oneshot::Receiver<Result<bool>> {
3984 self.send_job(None, move |repo, _cx| async move {
3985 match repo {
3986 RepositoryState::Local { backend, .. } => {
3987 backend.compare_checkpoints(left, right).await
3988 }
3989 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
3990 }
3991 })
3992 }
3993
3994 pub fn diff_checkpoints(
3995 &mut self,
3996 base_checkpoint: GitRepositoryCheckpoint,
3997 target_checkpoint: GitRepositoryCheckpoint,
3998 ) -> oneshot::Receiver<Result<String>> {
3999 self.send_job(None, move |repo, _cx| async move {
4000 match repo {
4001 RepositoryState::Local { backend, .. } => {
4002 backend
4003 .diff_checkpoints(base_checkpoint, target_checkpoint)
4004 .await
4005 }
4006 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
4007 }
4008 })
4009 }
4010
4011 fn schedule_scan(
4012 &mut self,
4013 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4014 cx: &mut Context<Self>,
4015 ) {
4016 let this = cx.weak_entity();
4017 let _ = self.send_keyed_job(
4018 Some(GitJobKey::ReloadGitState),
4019 None,
4020 |state, mut cx| async move {
4021 log::debug!("run scheduled git status scan");
4022
4023 let Some(this) = this.upgrade() else {
4024 return Ok(());
4025 };
4026 let RepositoryState::Local { backend, .. } = state else {
4027 bail!("not a local repository")
4028 };
4029 let (snapshot, events) = this
4030 .read_with(&mut cx, |this, _| {
4031 compute_snapshot(
4032 this.id,
4033 this.work_directory_abs_path.clone(),
4034 this.snapshot.clone(),
4035 backend.clone(),
4036 )
4037 })?
4038 .await?;
4039 this.update(&mut cx, |this, cx| {
4040 this.snapshot = snapshot.clone();
4041 for event in events {
4042 cx.emit(event);
4043 }
4044 })?;
4045 if let Some(updates_tx) = updates_tx {
4046 updates_tx
4047 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
4048 .ok();
4049 }
4050 Ok(())
4051 },
4052 );
4053 }
4054
4055 fn spawn_local_git_worker(
4056 work_directory_abs_path: Arc<Path>,
4057 dot_git_abs_path: Arc<Path>,
4058 _repository_dir_abs_path: Arc<Path>,
4059 _common_dir_abs_path: Arc<Path>,
4060 project_environment: WeakEntity<ProjectEnvironment>,
4061 fs: Arc<dyn Fs>,
4062 cx: &mut Context<Self>,
4063 ) -> mpsc::UnboundedSender<GitJob> {
4064 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4065
4066 cx.spawn(async move |_, cx| {
4067 let environment = project_environment
4068 .upgrade()
4069 .context("missing project environment")?
4070 .update(cx, |project_environment, cx| {
4071 project_environment.get_directory_environment(work_directory_abs_path.clone(), cx)
4072 })?
4073 .await
4074 .unwrap_or_else(|| {
4075 log::error!("failed to get working directory environment for repository {work_directory_abs_path:?}");
4076 HashMap::default()
4077 });
4078 let backend = cx
4079 .background_spawn(async move {
4080 fs.open_repo(&dot_git_abs_path)
4081 .with_context(|| format!("opening repository at {dot_git_abs_path:?}"))
4082 })
4083 .await?;
4084
4085 if let Some(git_hosting_provider_registry) =
4086 cx.update(|cx| GitHostingProviderRegistry::try_global(cx))?
4087 {
4088 git_hosting_providers::register_additional_providers(
4089 git_hosting_provider_registry,
4090 backend.clone(),
4091 );
4092 }
4093
4094 let state = RepositoryState::Local {
4095 backend,
4096 environment: Arc::new(environment),
4097 };
4098 let mut jobs = VecDeque::new();
4099 loop {
4100 while let Ok(Some(next_job)) = job_rx.try_next() {
4101 jobs.push_back(next_job);
4102 }
4103
4104 if let Some(job) = jobs.pop_front() {
4105 if let Some(current_key) = &job.key {
4106 if jobs
4107 .iter()
4108 .any(|other_job| other_job.key.as_ref() == Some(current_key))
4109 {
4110 continue;
4111 }
4112 }
4113 (job.job)(state.clone(), cx).await;
4114 } else if let Some(job) = job_rx.next().await {
4115 jobs.push_back(job);
4116 } else {
4117 break;
4118 }
4119 }
4120 anyhow::Ok(())
4121 })
4122 .detach_and_log_err(cx);
4123
4124 job_tx
4125 }
4126
4127 fn spawn_remote_git_worker(
4128 project_id: ProjectId,
4129 client: AnyProtoClient,
4130 cx: &mut Context<Self>,
4131 ) -> mpsc::UnboundedSender<GitJob> {
4132 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4133
4134 cx.spawn(async move |_, cx| {
4135 let state = RepositoryState::Remote { project_id, client };
4136 let mut jobs = VecDeque::new();
4137 loop {
4138 while let Ok(Some(next_job)) = job_rx.try_next() {
4139 jobs.push_back(next_job);
4140 }
4141
4142 if let Some(job) = jobs.pop_front() {
4143 if let Some(current_key) = &job.key {
4144 if jobs
4145 .iter()
4146 .any(|other_job| other_job.key.as_ref() == Some(current_key))
4147 {
4148 continue;
4149 }
4150 }
4151 (job.job)(state.clone(), cx).await;
4152 } else if let Some(job) = job_rx.next().await {
4153 jobs.push_back(job);
4154 } else {
4155 break;
4156 }
4157 }
4158 anyhow::Ok(())
4159 })
4160 .detach_and_log_err(cx);
4161
4162 job_tx
4163 }
4164
4165 fn load_staged_text(
4166 &mut self,
4167 buffer_id: BufferId,
4168 repo_path: RepoPath,
4169 cx: &App,
4170 ) -> Task<Result<Option<String>>> {
4171 let rx = self.send_job(None, move |state, _| async move {
4172 match state {
4173 RepositoryState::Local { backend, .. } => {
4174 anyhow::Ok(backend.load_index_text(repo_path).await)
4175 }
4176 RepositoryState::Remote { project_id, client } => {
4177 let response = client
4178 .request(proto::OpenUnstagedDiff {
4179 project_id: project_id.to_proto(),
4180 buffer_id: buffer_id.to_proto(),
4181 })
4182 .await?;
4183 Ok(response.staged_text)
4184 }
4185 }
4186 });
4187 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4188 }
4189
4190 fn load_committed_text(
4191 &mut self,
4192 buffer_id: BufferId,
4193 repo_path: RepoPath,
4194 cx: &App,
4195 ) -> Task<Result<DiffBasesChange>> {
4196 let rx = self.send_job(None, move |state, _| async move {
4197 match state {
4198 RepositoryState::Local { backend, .. } => {
4199 let committed_text = backend.load_committed_text(repo_path.clone()).await;
4200 let staged_text = backend.load_index_text(repo_path).await;
4201 let diff_bases_change = if committed_text == staged_text {
4202 DiffBasesChange::SetBoth(committed_text)
4203 } else {
4204 DiffBasesChange::SetEach {
4205 index: staged_text,
4206 head: committed_text,
4207 }
4208 };
4209 anyhow::Ok(diff_bases_change)
4210 }
4211 RepositoryState::Remote { project_id, client } => {
4212 use proto::open_uncommitted_diff_response::Mode;
4213
4214 let response = client
4215 .request(proto::OpenUncommittedDiff {
4216 project_id: project_id.to_proto(),
4217 buffer_id: buffer_id.to_proto(),
4218 })
4219 .await?;
4220 let mode = Mode::from_i32(response.mode).context("Invalid mode")?;
4221 let bases = match mode {
4222 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
4223 Mode::IndexAndHead => DiffBasesChange::SetEach {
4224 head: response.committed_text,
4225 index: response.staged_text,
4226 },
4227 };
4228 Ok(bases)
4229 }
4230 }
4231 });
4232
4233 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4234 }
4235
4236 fn paths_changed(
4237 &mut self,
4238 paths: Vec<RepoPath>,
4239 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4240 cx: &mut Context<Self>,
4241 ) {
4242 self.paths_needing_status_update.extend(paths);
4243
4244 let this = cx.weak_entity();
4245 let _ = self.send_keyed_job(
4246 Some(GitJobKey::RefreshStatuses),
4247 None,
4248 |state, mut cx| async move {
4249 let (prev_snapshot, mut changed_paths) = this.update(&mut cx, |this, _| {
4250 (
4251 this.snapshot.clone(),
4252 mem::take(&mut this.paths_needing_status_update),
4253 )
4254 })?;
4255 let RepositoryState::Local { backend, .. } = state else {
4256 bail!("not a local repository")
4257 };
4258
4259 let paths = changed_paths.iter().cloned().collect::<Vec<_>>();
4260 let statuses = backend.status(&paths).await?;
4261
4262 let changed_path_statuses = cx
4263 .background_spawn(async move {
4264 let mut changed_path_statuses = Vec::new();
4265 let prev_statuses = prev_snapshot.statuses_by_path.clone();
4266 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4267
4268 for (repo_path, status) in &*statuses.entries {
4269 changed_paths.remove(repo_path);
4270 if cursor.seek_forward(&PathTarget::Path(repo_path), Bias::Left, &()) {
4271 if cursor.item().is_some_and(|entry| entry.status == *status) {
4272 continue;
4273 }
4274 }
4275
4276 changed_path_statuses.push(Edit::Insert(StatusEntry {
4277 repo_path: repo_path.clone(),
4278 status: *status,
4279 }));
4280 }
4281 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4282 for path in changed_paths.into_iter() {
4283 if cursor.seek_forward(&PathTarget::Path(&path), Bias::Left, &()) {
4284 changed_path_statuses.push(Edit::Remove(PathKey(path.0)));
4285 }
4286 }
4287 changed_path_statuses
4288 })
4289 .await;
4290
4291 this.update(&mut cx, |this, cx| {
4292 if !changed_path_statuses.is_empty() {
4293 this.snapshot
4294 .statuses_by_path
4295 .edit(changed_path_statuses, &());
4296 this.snapshot.scan_id += 1;
4297 if let Some(updates_tx) = updates_tx {
4298 updates_tx
4299 .unbounded_send(DownstreamUpdate::UpdateRepository(
4300 this.snapshot.clone(),
4301 ))
4302 .ok();
4303 }
4304 }
4305 cx.emit(RepositoryEvent::Updated { full_scan: false });
4306 })
4307 },
4308 );
4309 }
4310
4311 /// currently running git command and when it started
4312 pub fn current_job(&self) -> Option<JobInfo> {
4313 self.active_jobs.values().next().cloned()
4314 }
4315
4316 pub fn barrier(&mut self) -> oneshot::Receiver<()> {
4317 self.send_job(None, |_, _| async {})
4318 }
4319}
4320
4321fn get_permalink_in_rust_registry_src(
4322 provider_registry: Arc<GitHostingProviderRegistry>,
4323 path: PathBuf,
4324 selection: Range<u32>,
4325) -> Result<url::Url> {
4326 #[derive(Deserialize)]
4327 struct CargoVcsGit {
4328 sha1: String,
4329 }
4330
4331 #[derive(Deserialize)]
4332 struct CargoVcsInfo {
4333 git: CargoVcsGit,
4334 path_in_vcs: String,
4335 }
4336
4337 #[derive(Deserialize)]
4338 struct CargoPackage {
4339 repository: String,
4340 }
4341
4342 #[derive(Deserialize)]
4343 struct CargoToml {
4344 package: CargoPackage,
4345 }
4346
4347 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
4348 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
4349 Some((dir, json))
4350 }) else {
4351 bail!("No .cargo_vcs_info.json found in parent directories")
4352 };
4353 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
4354 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
4355 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
4356 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
4357 .context("parsing package.repository field of manifest")?;
4358 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
4359 let permalink = provider.build_permalink(
4360 remote,
4361 BuildPermalinkParams {
4362 sha: &cargo_vcs_info.git.sha1,
4363 path: &path.to_string_lossy(),
4364 selection: Some(selection),
4365 },
4366 );
4367 Ok(permalink)
4368}
4369
4370fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
4371 let Some(blame) = blame else {
4372 return proto::BlameBufferResponse {
4373 blame_response: None,
4374 };
4375 };
4376
4377 let entries = blame
4378 .entries
4379 .into_iter()
4380 .map(|entry| proto::BlameEntry {
4381 sha: entry.sha.as_bytes().into(),
4382 start_line: entry.range.start,
4383 end_line: entry.range.end,
4384 original_line_number: entry.original_line_number,
4385 author: entry.author.clone(),
4386 author_mail: entry.author_mail.clone(),
4387 author_time: entry.author_time,
4388 author_tz: entry.author_tz.clone(),
4389 committer: entry.committer_name.clone(),
4390 committer_mail: entry.committer_email.clone(),
4391 committer_time: entry.committer_time,
4392 committer_tz: entry.committer_tz.clone(),
4393 summary: entry.summary.clone(),
4394 previous: entry.previous.clone(),
4395 filename: entry.filename.clone(),
4396 })
4397 .collect::<Vec<_>>();
4398
4399 let messages = blame
4400 .messages
4401 .into_iter()
4402 .map(|(oid, message)| proto::CommitMessage {
4403 oid: oid.as_bytes().into(),
4404 message,
4405 })
4406 .collect::<Vec<_>>();
4407
4408 proto::BlameBufferResponse {
4409 blame_response: Some(proto::blame_buffer_response::BlameResponse {
4410 entries,
4411 messages,
4412 remote_url: blame.remote_url,
4413 }),
4414 }
4415}
4416
4417fn deserialize_blame_buffer_response(
4418 response: proto::BlameBufferResponse,
4419) -> Option<git::blame::Blame> {
4420 let response = response.blame_response?;
4421 let entries = response
4422 .entries
4423 .into_iter()
4424 .filter_map(|entry| {
4425 Some(git::blame::BlameEntry {
4426 sha: git::Oid::from_bytes(&entry.sha).ok()?,
4427 range: entry.start_line..entry.end_line,
4428 original_line_number: entry.original_line_number,
4429 committer_name: entry.committer,
4430 committer_time: entry.committer_time,
4431 committer_tz: entry.committer_tz,
4432 committer_email: entry.committer_mail,
4433 author: entry.author,
4434 author_mail: entry.author_mail,
4435 author_time: entry.author_time,
4436 author_tz: entry.author_tz,
4437 summary: entry.summary,
4438 previous: entry.previous,
4439 filename: entry.filename,
4440 })
4441 })
4442 .collect::<Vec<_>>();
4443
4444 let messages = response
4445 .messages
4446 .into_iter()
4447 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
4448 .collect::<HashMap<_, _>>();
4449
4450 Some(Blame {
4451 entries,
4452 messages,
4453 remote_url: response.remote_url,
4454 })
4455}
4456
4457fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
4458 proto::Branch {
4459 is_head: branch.is_head,
4460 ref_name: branch.ref_name.to_string(),
4461 unix_timestamp: branch
4462 .most_recent_commit
4463 .as_ref()
4464 .map(|commit| commit.commit_timestamp as u64),
4465 upstream: branch.upstream.as_ref().map(|upstream| proto::GitUpstream {
4466 ref_name: upstream.ref_name.to_string(),
4467 tracking: upstream
4468 .tracking
4469 .status()
4470 .map(|upstream| proto::UpstreamTracking {
4471 ahead: upstream.ahead as u64,
4472 behind: upstream.behind as u64,
4473 }),
4474 }),
4475 most_recent_commit: branch
4476 .most_recent_commit
4477 .as_ref()
4478 .map(|commit| proto::CommitSummary {
4479 sha: commit.sha.to_string(),
4480 subject: commit.subject.to_string(),
4481 commit_timestamp: commit.commit_timestamp,
4482 }),
4483 }
4484}
4485
4486fn proto_to_branch(proto: &proto::Branch) -> git::repository::Branch {
4487 git::repository::Branch {
4488 is_head: proto.is_head,
4489 ref_name: proto.ref_name.clone().into(),
4490 upstream: proto
4491 .upstream
4492 .as_ref()
4493 .map(|upstream| git::repository::Upstream {
4494 ref_name: upstream.ref_name.to_string().into(),
4495 tracking: upstream
4496 .tracking
4497 .as_ref()
4498 .map(|tracking| {
4499 git::repository::UpstreamTracking::Tracked(UpstreamTrackingStatus {
4500 ahead: tracking.ahead as u32,
4501 behind: tracking.behind as u32,
4502 })
4503 })
4504 .unwrap_or(git::repository::UpstreamTracking::Gone),
4505 }),
4506 most_recent_commit: proto.most_recent_commit.as_ref().map(|commit| {
4507 git::repository::CommitSummary {
4508 sha: commit.sha.to_string().into(),
4509 subject: commit.subject.to_string().into(),
4510 commit_timestamp: commit.commit_timestamp,
4511 has_parent: true,
4512 }
4513 }),
4514 }
4515}
4516
4517fn commit_details_to_proto(commit: &CommitDetails) -> proto::GitCommitDetails {
4518 proto::GitCommitDetails {
4519 sha: commit.sha.to_string(),
4520 message: commit.message.to_string(),
4521 commit_timestamp: commit.commit_timestamp,
4522 author_email: commit.author_email.to_string(),
4523 author_name: commit.author_name.to_string(),
4524 }
4525}
4526
4527fn proto_to_commit_details(proto: &proto::GitCommitDetails) -> CommitDetails {
4528 CommitDetails {
4529 sha: proto.sha.clone().into(),
4530 message: proto.message.clone().into(),
4531 commit_timestamp: proto.commit_timestamp,
4532 author_email: proto.author_email.clone().into(),
4533 author_name: proto.author_name.clone().into(),
4534 }
4535}
4536
4537async fn compute_snapshot(
4538 id: RepositoryId,
4539 work_directory_abs_path: Arc<Path>,
4540 prev_snapshot: RepositorySnapshot,
4541 backend: Arc<dyn GitRepository>,
4542) -> Result<(RepositorySnapshot, Vec<RepositoryEvent>)> {
4543 let mut events = Vec::new();
4544 let branches = backend.branches().await?;
4545 let branch = branches.into_iter().find(|branch| branch.is_head);
4546 let statuses = backend.status(&[WORK_DIRECTORY_REPO_PATH.clone()]).await?;
4547 let statuses_by_path = SumTree::from_iter(
4548 statuses
4549 .entries
4550 .iter()
4551 .map(|(repo_path, status)| StatusEntry {
4552 repo_path: repo_path.clone(),
4553 status: *status,
4554 }),
4555 &(),
4556 );
4557 let (merge_details, merge_heads_changed) =
4558 MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?;
4559 log::debug!("new merge details (changed={merge_heads_changed:?}): {merge_details:?}");
4560
4561 if merge_heads_changed
4562 || branch != prev_snapshot.branch
4563 || statuses_by_path != prev_snapshot.statuses_by_path
4564 {
4565 events.push(RepositoryEvent::Updated { full_scan: true });
4566 }
4567
4568 // Cache merge conflict paths so they don't change from staging/unstaging,
4569 // until the merge heads change (at commit time, etc.).
4570 if merge_heads_changed {
4571 events.push(RepositoryEvent::MergeHeadsChanged);
4572 }
4573
4574 // Useful when branch is None in detached head state
4575 let head_commit = match backend.head_sha().await {
4576 Some(head_sha) => backend.show(head_sha).await.log_err(),
4577 None => None,
4578 };
4579
4580 let snapshot = RepositorySnapshot {
4581 id,
4582 statuses_by_path,
4583 work_directory_abs_path,
4584 scan_id: prev_snapshot.scan_id + 1,
4585 branch,
4586 head_commit,
4587 merge: merge_details,
4588 };
4589
4590 Ok((snapshot, events))
4591}
4592
4593fn status_from_proto(
4594 simple_status: i32,
4595 status: Option<proto::GitFileStatus>,
4596) -> anyhow::Result<FileStatus> {
4597 use proto::git_file_status::Variant;
4598
4599 let Some(variant) = status.and_then(|status| status.variant) else {
4600 let code = proto::GitStatus::from_i32(simple_status)
4601 .with_context(|| format!("Invalid git status code: {simple_status}"))?;
4602 let result = match code {
4603 proto::GitStatus::Added => TrackedStatus {
4604 worktree_status: StatusCode::Added,
4605 index_status: StatusCode::Unmodified,
4606 }
4607 .into(),
4608 proto::GitStatus::Modified => TrackedStatus {
4609 worktree_status: StatusCode::Modified,
4610 index_status: StatusCode::Unmodified,
4611 }
4612 .into(),
4613 proto::GitStatus::Conflict => UnmergedStatus {
4614 first_head: UnmergedStatusCode::Updated,
4615 second_head: UnmergedStatusCode::Updated,
4616 }
4617 .into(),
4618 proto::GitStatus::Deleted => TrackedStatus {
4619 worktree_status: StatusCode::Deleted,
4620 index_status: StatusCode::Unmodified,
4621 }
4622 .into(),
4623 _ => anyhow::bail!("Invalid code for simple status: {simple_status}"),
4624 };
4625 return Ok(result);
4626 };
4627
4628 let result = match variant {
4629 Variant::Untracked(_) => FileStatus::Untracked,
4630 Variant::Ignored(_) => FileStatus::Ignored,
4631 Variant::Unmerged(unmerged) => {
4632 let [first_head, second_head] =
4633 [unmerged.first_head, unmerged.second_head].map(|head| {
4634 let code = proto::GitStatus::from_i32(head)
4635 .with_context(|| format!("Invalid git status code: {head}"))?;
4636 let result = match code {
4637 proto::GitStatus::Added => UnmergedStatusCode::Added,
4638 proto::GitStatus::Updated => UnmergedStatusCode::Updated,
4639 proto::GitStatus::Deleted => UnmergedStatusCode::Deleted,
4640 _ => anyhow::bail!("Invalid code for unmerged status: {code:?}"),
4641 };
4642 Ok(result)
4643 });
4644 let [first_head, second_head] = [first_head?, second_head?];
4645 UnmergedStatus {
4646 first_head,
4647 second_head,
4648 }
4649 .into()
4650 }
4651 Variant::Tracked(tracked) => {
4652 let [index_status, worktree_status] = [tracked.index_status, tracked.worktree_status]
4653 .map(|status| {
4654 let code = proto::GitStatus::from_i32(status)
4655 .with_context(|| format!("Invalid git status code: {status}"))?;
4656 let result = match code {
4657 proto::GitStatus::Modified => StatusCode::Modified,
4658 proto::GitStatus::TypeChanged => StatusCode::TypeChanged,
4659 proto::GitStatus::Added => StatusCode::Added,
4660 proto::GitStatus::Deleted => StatusCode::Deleted,
4661 proto::GitStatus::Renamed => StatusCode::Renamed,
4662 proto::GitStatus::Copied => StatusCode::Copied,
4663 proto::GitStatus::Unmodified => StatusCode::Unmodified,
4664 _ => anyhow::bail!("Invalid code for tracked status: {code:?}"),
4665 };
4666 Ok(result)
4667 });
4668 let [index_status, worktree_status] = [index_status?, worktree_status?];
4669 TrackedStatus {
4670 index_status,
4671 worktree_status,
4672 }
4673 .into()
4674 }
4675 };
4676 Ok(result)
4677}
4678
4679fn status_to_proto(status: FileStatus) -> proto::GitFileStatus {
4680 use proto::git_file_status::{Tracked, Unmerged, Variant};
4681
4682 let variant = match status {
4683 FileStatus::Untracked => Variant::Untracked(Default::default()),
4684 FileStatus::Ignored => Variant::Ignored(Default::default()),
4685 FileStatus::Unmerged(UnmergedStatus {
4686 first_head,
4687 second_head,
4688 }) => Variant::Unmerged(Unmerged {
4689 first_head: unmerged_status_to_proto(first_head),
4690 second_head: unmerged_status_to_proto(second_head),
4691 }),
4692 FileStatus::Tracked(TrackedStatus {
4693 index_status,
4694 worktree_status,
4695 }) => Variant::Tracked(Tracked {
4696 index_status: tracked_status_to_proto(index_status),
4697 worktree_status: tracked_status_to_proto(worktree_status),
4698 }),
4699 };
4700 proto::GitFileStatus {
4701 variant: Some(variant),
4702 }
4703}
4704
4705fn unmerged_status_to_proto(code: UnmergedStatusCode) -> i32 {
4706 match code {
4707 UnmergedStatusCode::Added => proto::GitStatus::Added as _,
4708 UnmergedStatusCode::Deleted => proto::GitStatus::Deleted as _,
4709 UnmergedStatusCode::Updated => proto::GitStatus::Updated as _,
4710 }
4711}
4712
4713fn tracked_status_to_proto(code: StatusCode) -> i32 {
4714 match code {
4715 StatusCode::Added => proto::GitStatus::Added as _,
4716 StatusCode::Deleted => proto::GitStatus::Deleted as _,
4717 StatusCode::Modified => proto::GitStatus::Modified as _,
4718 StatusCode::Renamed => proto::GitStatus::Renamed as _,
4719 StatusCode::TypeChanged => proto::GitStatus::TypeChanged as _,
4720 StatusCode::Copied => proto::GitStatus::Copied as _,
4721 StatusCode::Unmodified => proto::GitStatus::Unmodified as _,
4722 }
4723}