1mod conflict_set;
2pub mod git_traversal;
3
4use crate::{
5 ProjectEnvironment, ProjectItem, ProjectPath,
6 buffer_store::{BufferStore, BufferStoreEvent},
7 worktree_store::{WorktreeStore, WorktreeStoreEvent},
8};
9use anyhow::{Context as _, Result, anyhow, bail};
10use askpass::AskPassDelegate;
11use buffer_diff::{BufferDiff, BufferDiffEvent};
12use client::ProjectId;
13use collections::HashMap;
14pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, ConflictSetUpdate};
15use fs::Fs;
16use futures::{
17 FutureExt, StreamExt as _,
18 channel::{mpsc, oneshot},
19 future::{self, Shared},
20};
21use git::{
22 BuildPermalinkParams, GitHostingProviderRegistry, WORK_DIRECTORY_REPO_PATH,
23 blame::Blame,
24 parse_git_remote_url,
25 repository::{
26 Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, DiffType, FetchOptions,
27 GitRepository, GitRepositoryCheckpoint, PushOptions, Remote, RemoteCommandOutput, RepoPath,
28 ResetMode, UpstreamTrackingStatus,
29 },
30 status::{
31 FileStatus, GitSummary, StatusCode, TrackedStatus, UnmergedStatus, UnmergedStatusCode,
32 },
33};
34use gpui::{
35 App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
36 WeakEntity,
37};
38use language::{
39 Buffer, BufferEvent, Language, LanguageRegistry,
40 proto::{deserialize_version, serialize_version},
41};
42use parking_lot::Mutex;
43use postage::stream::Stream as _;
44use rpc::{
45 AnyProtoClient, TypedEnvelope,
46 proto::{self, FromProto, SSH_PROJECT_ID, ToProto, git_reset, split_repository_update},
47};
48use serde::Deserialize;
49use std::{
50 cmp::Ordering,
51 collections::{BTreeSet, VecDeque},
52 future::Future,
53 mem,
54 ops::Range,
55 path::{Path, PathBuf},
56 sync::{
57 Arc,
58 atomic::{self, AtomicU64},
59 },
60 time::Instant,
61};
62use sum_tree::{Edit, SumTree, TreeSet};
63use text::{Bias, BufferId};
64use util::{ResultExt, debug_panic, post_inc};
65use worktree::{
66 File, PathKey, PathProgress, PathSummary, PathTarget, UpdatedGitRepositoriesSet,
67 UpdatedGitRepository, Worktree,
68};
69
70pub struct GitStore {
71 state: GitStoreState,
72 buffer_store: Entity<BufferStore>,
73 worktree_store: Entity<WorktreeStore>,
74 repositories: HashMap<RepositoryId, Entity<Repository>>,
75 active_repo_id: Option<RepositoryId>,
76 #[allow(clippy::type_complexity)]
77 loading_diffs:
78 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
79 diffs: HashMap<BufferId, Entity<BufferGitState>>,
80 shared_diffs: HashMap<proto::PeerId, HashMap<BufferId, SharedDiffs>>,
81 _subscriptions: Vec<Subscription>,
82}
83
84#[derive(Default)]
85struct SharedDiffs {
86 unstaged: Option<Entity<BufferDiff>>,
87 uncommitted: Option<Entity<BufferDiff>>,
88}
89
90struct BufferGitState {
91 unstaged_diff: Option<WeakEntity<BufferDiff>>,
92 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
93 conflict_set: Option<WeakEntity<ConflictSet>>,
94 recalculate_diff_task: Option<Task<Result<()>>>,
95 reparse_conflict_markers_task: Option<Task<Result<()>>>,
96 language: Option<Arc<Language>>,
97 language_registry: Option<Arc<LanguageRegistry>>,
98 conflict_updated_futures: Vec<oneshot::Sender<()>>,
99 recalculating_tx: postage::watch::Sender<bool>,
100
101 /// These operation counts are used to ensure that head and index text
102 /// values read from the git repository are up-to-date with any hunk staging
103 /// operations that have been performed on the BufferDiff.
104 ///
105 /// The operation count is incremented immediately when the user initiates a
106 /// hunk stage/unstage operation. Then, upon finishing writing the new index
107 /// text do disk, the `operation count as of write` is updated to reflect
108 /// the operation count that prompted the write.
109 hunk_staging_operation_count: usize,
110 hunk_staging_operation_count_as_of_write: usize,
111
112 head_text: Option<Arc<String>>,
113 index_text: Option<Arc<String>>,
114 head_changed: bool,
115 index_changed: bool,
116 language_changed: bool,
117}
118
119#[derive(Clone, Debug)]
120enum DiffBasesChange {
121 SetIndex(Option<String>),
122 SetHead(Option<String>),
123 SetEach {
124 index: Option<String>,
125 head: Option<String>,
126 },
127 SetBoth(Option<String>),
128}
129
130#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
131enum DiffKind {
132 Unstaged,
133 Uncommitted,
134}
135
136enum GitStoreState {
137 Local {
138 next_repository_id: Arc<AtomicU64>,
139 downstream: Option<LocalDownstreamState>,
140 project_environment: Entity<ProjectEnvironment>,
141 fs: Arc<dyn Fs>,
142 },
143 Ssh {
144 upstream_client: AnyProtoClient,
145 upstream_project_id: ProjectId,
146 downstream: Option<(AnyProtoClient, ProjectId)>,
147 },
148 Remote {
149 upstream_client: AnyProtoClient,
150 upstream_project_id: ProjectId,
151 },
152}
153
154enum DownstreamUpdate {
155 UpdateRepository(RepositorySnapshot),
156 RemoveRepository(RepositoryId),
157}
158
159struct LocalDownstreamState {
160 client: AnyProtoClient,
161 project_id: ProjectId,
162 updates_tx: mpsc::UnboundedSender<DownstreamUpdate>,
163 _task: Task<Result<()>>,
164}
165
166#[derive(Clone, Debug)]
167pub struct GitStoreCheckpoint {
168 checkpoints_by_work_dir_abs_path: HashMap<Arc<Path>, GitRepositoryCheckpoint>,
169}
170
171#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct StatusEntry {
173 pub repo_path: RepoPath,
174 pub status: FileStatus,
175}
176
177impl StatusEntry {
178 fn to_proto(&self) -> proto::StatusEntry {
179 let simple_status = match self.status {
180 FileStatus::Ignored | FileStatus::Untracked => proto::GitStatus::Added as i32,
181 FileStatus::Unmerged { .. } => proto::GitStatus::Conflict as i32,
182 FileStatus::Tracked(TrackedStatus {
183 index_status,
184 worktree_status,
185 }) => tracked_status_to_proto(if worktree_status != StatusCode::Unmodified {
186 worktree_status
187 } else {
188 index_status
189 }),
190 };
191
192 proto::StatusEntry {
193 repo_path: self.repo_path.as_ref().to_proto(),
194 simple_status,
195 status: Some(status_to_proto(self.status)),
196 }
197 }
198}
199
200impl TryFrom<proto::StatusEntry> for StatusEntry {
201 type Error = anyhow::Error;
202
203 fn try_from(value: proto::StatusEntry) -> Result<Self, Self::Error> {
204 let repo_path = RepoPath(Arc::<Path>::from_proto(value.repo_path));
205 let status = status_from_proto(value.simple_status, value.status)?;
206 Ok(Self { repo_path, status })
207 }
208}
209
210impl sum_tree::Item for StatusEntry {
211 type Summary = PathSummary<GitSummary>;
212
213 fn summary(&self, _: &<Self::Summary as sum_tree::Summary>::Context) -> Self::Summary {
214 PathSummary {
215 max_path: self.repo_path.0.clone(),
216 item_summary: self.status.summary(),
217 }
218 }
219}
220
221impl sum_tree::KeyedItem for StatusEntry {
222 type Key = PathKey;
223
224 fn key(&self) -> Self::Key {
225 PathKey(self.repo_path.0.clone())
226 }
227}
228
229#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
230pub struct RepositoryId(pub u64);
231
232#[derive(Clone, Debug, Default, PartialEq, Eq)]
233pub struct MergeDetails {
234 pub conflicted_paths: TreeSet<RepoPath>,
235 pub message: Option<SharedString>,
236 pub heads: Vec<Option<SharedString>>,
237}
238
239#[derive(Clone, Debug, PartialEq, Eq)]
240pub struct RepositorySnapshot {
241 pub id: RepositoryId,
242 pub statuses_by_path: SumTree<StatusEntry>,
243 pub work_directory_abs_path: Arc<Path>,
244 pub branch: Option<Branch>,
245 pub head_commit: Option<CommitDetails>,
246 pub scan_id: u64,
247 pub merge: MergeDetails,
248}
249
250type JobId = u64;
251
252#[derive(Clone, Debug, PartialEq, Eq)]
253pub struct JobInfo {
254 pub start: Instant,
255 pub message: SharedString,
256}
257
258pub struct Repository {
259 this: WeakEntity<Self>,
260 snapshot: RepositorySnapshot,
261 commit_message_buffer: Option<Entity<Buffer>>,
262 git_store: WeakEntity<GitStore>,
263 // For a local repository, holds paths that have had worktree events since the last status scan completed,
264 // and that should be examined during the next status scan.
265 paths_needing_status_update: BTreeSet<RepoPath>,
266 job_sender: mpsc::UnboundedSender<GitJob>,
267 active_jobs: HashMap<JobId, JobInfo>,
268 job_id: JobId,
269 askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
270 latest_askpass_id: u64,
271}
272
273impl std::ops::Deref for Repository {
274 type Target = RepositorySnapshot;
275
276 fn deref(&self) -> &Self::Target {
277 &self.snapshot
278 }
279}
280
281#[derive(Clone)]
282pub enum RepositoryState {
283 Local {
284 backend: Arc<dyn GitRepository>,
285 environment: Arc<HashMap<String, String>>,
286 },
287 Remote {
288 project_id: ProjectId,
289 client: AnyProtoClient,
290 },
291}
292
293#[derive(Clone, Debug)]
294pub enum RepositoryEvent {
295 Updated { full_scan: bool, new_instance: bool },
296 MergeHeadsChanged,
297}
298
299#[derive(Clone, Debug)]
300pub struct JobsUpdated;
301
302#[derive(Debug)]
303pub enum GitStoreEvent {
304 ActiveRepositoryChanged(Option<RepositoryId>),
305 RepositoryUpdated(RepositoryId, RepositoryEvent, bool),
306 RepositoryAdded(RepositoryId),
307 RepositoryRemoved(RepositoryId),
308 IndexWriteError(anyhow::Error),
309 JobsUpdated,
310 ConflictsUpdated,
311}
312
313impl EventEmitter<RepositoryEvent> for Repository {}
314impl EventEmitter<JobsUpdated> for Repository {}
315impl EventEmitter<GitStoreEvent> for GitStore {}
316
317pub struct GitJob {
318 job: Box<dyn FnOnce(RepositoryState, &mut AsyncApp) -> Task<()>>,
319 key: Option<GitJobKey>,
320}
321
322#[derive(PartialEq, Eq)]
323enum GitJobKey {
324 WriteIndex(RepoPath),
325 ReloadBufferDiffBases,
326 RefreshStatuses,
327 ReloadGitState,
328}
329
330impl GitStore {
331 pub fn local(
332 worktree_store: &Entity<WorktreeStore>,
333 buffer_store: Entity<BufferStore>,
334 environment: Entity<ProjectEnvironment>,
335 fs: Arc<dyn Fs>,
336 cx: &mut Context<Self>,
337 ) -> Self {
338 Self::new(
339 worktree_store.clone(),
340 buffer_store,
341 GitStoreState::Local {
342 next_repository_id: Arc::new(AtomicU64::new(1)),
343 downstream: None,
344 project_environment: environment,
345 fs,
346 },
347 cx,
348 )
349 }
350
351 pub fn remote(
352 worktree_store: &Entity<WorktreeStore>,
353 buffer_store: Entity<BufferStore>,
354 upstream_client: AnyProtoClient,
355 project_id: ProjectId,
356 cx: &mut Context<Self>,
357 ) -> Self {
358 Self::new(
359 worktree_store.clone(),
360 buffer_store,
361 GitStoreState::Remote {
362 upstream_client,
363 upstream_project_id: project_id,
364 },
365 cx,
366 )
367 }
368
369 pub fn ssh(
370 worktree_store: &Entity<WorktreeStore>,
371 buffer_store: Entity<BufferStore>,
372 upstream_client: AnyProtoClient,
373 cx: &mut Context<Self>,
374 ) -> Self {
375 Self::new(
376 worktree_store.clone(),
377 buffer_store,
378 GitStoreState::Ssh {
379 upstream_client,
380 upstream_project_id: ProjectId(SSH_PROJECT_ID),
381 downstream: None,
382 },
383 cx,
384 )
385 }
386
387 fn new(
388 worktree_store: Entity<WorktreeStore>,
389 buffer_store: Entity<BufferStore>,
390 state: GitStoreState,
391 cx: &mut Context<Self>,
392 ) -> Self {
393 let _subscriptions = vec![
394 cx.subscribe(&worktree_store, Self::on_worktree_store_event),
395 cx.subscribe(&buffer_store, Self::on_buffer_store_event),
396 ];
397
398 GitStore {
399 state,
400 buffer_store,
401 worktree_store,
402 repositories: HashMap::default(),
403 active_repo_id: None,
404 _subscriptions,
405 loading_diffs: HashMap::default(),
406 shared_diffs: HashMap::default(),
407 diffs: HashMap::default(),
408 }
409 }
410
411 pub fn init(client: &AnyProtoClient) {
412 client.add_entity_request_handler(Self::handle_get_remotes);
413 client.add_entity_request_handler(Self::handle_get_branches);
414 client.add_entity_request_handler(Self::handle_change_branch);
415 client.add_entity_request_handler(Self::handle_create_branch);
416 client.add_entity_request_handler(Self::handle_git_init);
417 client.add_entity_request_handler(Self::handle_push);
418 client.add_entity_request_handler(Self::handle_pull);
419 client.add_entity_request_handler(Self::handle_fetch);
420 client.add_entity_request_handler(Self::handle_stage);
421 client.add_entity_request_handler(Self::handle_unstage);
422 client.add_entity_request_handler(Self::handle_commit);
423 client.add_entity_request_handler(Self::handle_reset);
424 client.add_entity_request_handler(Self::handle_show);
425 client.add_entity_request_handler(Self::handle_load_commit_diff);
426 client.add_entity_request_handler(Self::handle_checkout_files);
427 client.add_entity_request_handler(Self::handle_open_commit_message_buffer);
428 client.add_entity_request_handler(Self::handle_set_index_text);
429 client.add_entity_request_handler(Self::handle_askpass);
430 client.add_entity_request_handler(Self::handle_check_for_pushed_commits);
431 client.add_entity_request_handler(Self::handle_git_diff);
432 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
433 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
434 client.add_entity_message_handler(Self::handle_update_diff_bases);
435 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
436 client.add_entity_request_handler(Self::handle_blame_buffer);
437 client.add_entity_message_handler(Self::handle_update_repository);
438 client.add_entity_message_handler(Self::handle_remove_repository);
439 }
440
441 pub fn is_local(&self) -> bool {
442 matches!(self.state, GitStoreState::Local { .. })
443 }
444
445 pub fn shared(&mut self, project_id: u64, client: AnyProtoClient, cx: &mut Context<Self>) {
446 match &mut self.state {
447 GitStoreState::Ssh {
448 downstream: downstream_client,
449 ..
450 } => {
451 for repo in self.repositories.values() {
452 let update = repo.read(cx).snapshot.initial_update(project_id);
453 for update in split_repository_update(update) {
454 client.send(update).log_err();
455 }
456 }
457 *downstream_client = Some((client, ProjectId(project_id)));
458 }
459 GitStoreState::Local {
460 downstream: downstream_client,
461 ..
462 } => {
463 let mut snapshots = HashMap::default();
464 let (updates_tx, mut updates_rx) = mpsc::unbounded();
465 for repo in self.repositories.values() {
466 updates_tx
467 .unbounded_send(DownstreamUpdate::UpdateRepository(
468 repo.read(cx).snapshot.clone(),
469 ))
470 .ok();
471 }
472 *downstream_client = Some(LocalDownstreamState {
473 client: client.clone(),
474 project_id: ProjectId(project_id),
475 updates_tx,
476 _task: cx.spawn(async move |this, cx| {
477 cx.background_spawn(async move {
478 while let Some(update) = updates_rx.next().await {
479 match update {
480 DownstreamUpdate::UpdateRepository(snapshot) => {
481 if let Some(old_snapshot) = snapshots.get_mut(&snapshot.id)
482 {
483 let update =
484 snapshot.build_update(old_snapshot, project_id);
485 *old_snapshot = snapshot;
486 for update in split_repository_update(update) {
487 client.send(update)?;
488 }
489 } else {
490 let update = snapshot.initial_update(project_id);
491 for update in split_repository_update(update) {
492 client.send(update)?;
493 }
494 snapshots.insert(snapshot.id, snapshot);
495 }
496 }
497 DownstreamUpdate::RemoveRepository(id) => {
498 client.send(proto::RemoveRepository {
499 project_id,
500 id: id.to_proto(),
501 })?;
502 }
503 }
504 }
505 anyhow::Ok(())
506 })
507 .await
508 .ok();
509 this.update(cx, |this, _| {
510 if let GitStoreState::Local {
511 downstream: downstream_client,
512 ..
513 } = &mut this.state
514 {
515 downstream_client.take();
516 } else {
517 unreachable!("unshared called on remote store");
518 }
519 })
520 }),
521 });
522 }
523 GitStoreState::Remote { .. } => {
524 debug_panic!("shared called on remote store");
525 }
526 }
527 }
528
529 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
530 match &mut self.state {
531 GitStoreState::Local {
532 downstream: downstream_client,
533 ..
534 } => {
535 downstream_client.take();
536 }
537 GitStoreState::Ssh {
538 downstream: downstream_client,
539 ..
540 } => {
541 downstream_client.take();
542 }
543 GitStoreState::Remote { .. } => {
544 debug_panic!("unshared called on remote store");
545 }
546 }
547 self.shared_diffs.clear();
548 }
549
550 pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) {
551 self.shared_diffs.remove(peer_id);
552 }
553
554 pub fn active_repository(&self) -> Option<Entity<Repository>> {
555 self.active_repo_id
556 .as_ref()
557 .map(|id| self.repositories[&id].clone())
558 }
559
560 pub fn open_unstaged_diff(
561 &mut self,
562 buffer: Entity<Buffer>,
563 cx: &mut Context<Self>,
564 ) -> Task<Result<Entity<BufferDiff>>> {
565 let buffer_id = buffer.read(cx).remote_id();
566 if let Some(diff_state) = self.diffs.get(&buffer_id) {
567 if let Some(unstaged_diff) = diff_state
568 .read(cx)
569 .unstaged_diff
570 .as_ref()
571 .and_then(|weak| weak.upgrade())
572 {
573 if let Some(task) =
574 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
575 {
576 return cx.background_executor().spawn(async move {
577 task.await;
578 Ok(unstaged_diff)
579 });
580 }
581 return Task::ready(Ok(unstaged_diff));
582 }
583 }
584
585 let Some((repo, repo_path)) =
586 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
587 else {
588 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
589 };
590
591 let task = self
592 .loading_diffs
593 .entry((buffer_id, DiffKind::Unstaged))
594 .or_insert_with(|| {
595 let staged_text = repo.update(cx, |repo, cx| {
596 repo.load_staged_text(buffer_id, repo_path, cx)
597 });
598 cx.spawn(async move |this, cx| {
599 Self::open_diff_internal(
600 this,
601 DiffKind::Unstaged,
602 staged_text.await.map(DiffBasesChange::SetIndex),
603 buffer,
604 cx,
605 )
606 .await
607 .map_err(Arc::new)
608 })
609 .shared()
610 })
611 .clone();
612
613 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
614 }
615
616 pub fn open_uncommitted_diff(
617 &mut self,
618 buffer: Entity<Buffer>,
619 cx: &mut Context<Self>,
620 ) -> Task<Result<Entity<BufferDiff>>> {
621 let buffer_id = buffer.read(cx).remote_id();
622
623 if let Some(diff_state) = self.diffs.get(&buffer_id) {
624 if let Some(uncommitted_diff) = diff_state
625 .read(cx)
626 .uncommitted_diff
627 .as_ref()
628 .and_then(|weak| weak.upgrade())
629 {
630 if let Some(task) =
631 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
632 {
633 return cx.background_executor().spawn(async move {
634 task.await;
635 Ok(uncommitted_diff)
636 });
637 }
638 return Task::ready(Ok(uncommitted_diff));
639 }
640 }
641
642 let Some((repo, repo_path)) =
643 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
644 else {
645 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
646 };
647
648 let task = self
649 .loading_diffs
650 .entry((buffer_id, DiffKind::Uncommitted))
651 .or_insert_with(|| {
652 let changes = repo.update(cx, |repo, cx| {
653 repo.load_committed_text(buffer_id, repo_path, cx)
654 });
655
656 cx.spawn(async move |this, cx| {
657 Self::open_diff_internal(this, DiffKind::Uncommitted, changes.await, buffer, cx)
658 .await
659 .map_err(Arc::new)
660 })
661 .shared()
662 })
663 .clone();
664
665 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
666 }
667
668 async fn open_diff_internal(
669 this: WeakEntity<Self>,
670 kind: DiffKind,
671 texts: Result<DiffBasesChange>,
672 buffer_entity: Entity<Buffer>,
673 cx: &mut AsyncApp,
674 ) -> Result<Entity<BufferDiff>> {
675 let diff_bases_change = match texts {
676 Err(e) => {
677 this.update(cx, |this, cx| {
678 let buffer = buffer_entity.read(cx);
679 let buffer_id = buffer.remote_id();
680 this.loading_diffs.remove(&(buffer_id, kind));
681 })?;
682 return Err(e);
683 }
684 Ok(change) => change,
685 };
686
687 this.update(cx, |this, cx| {
688 let buffer = buffer_entity.read(cx);
689 let buffer_id = buffer.remote_id();
690 let language = buffer.language().cloned();
691 let language_registry = buffer.language_registry();
692 let text_snapshot = buffer.text_snapshot();
693 this.loading_diffs.remove(&(buffer_id, kind));
694
695 let git_store = cx.weak_entity();
696 let diff_state = this
697 .diffs
698 .entry(buffer_id)
699 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
700
701 let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
702
703 cx.subscribe(&diff, Self::on_buffer_diff_event).detach();
704 diff_state.update(cx, |diff_state, cx| {
705 diff_state.language = language;
706 diff_state.language_registry = language_registry;
707
708 match kind {
709 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
710 DiffKind::Uncommitted => {
711 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
712 diff
713 } else {
714 let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
715 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
716 unstaged_diff
717 };
718
719 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
720 diff_state.uncommitted_diff = Some(diff.downgrade())
721 }
722 }
723
724 diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
725 let rx = diff_state.wait_for_recalculation();
726
727 anyhow::Ok(async move {
728 if let Some(rx) = rx {
729 rx.await;
730 }
731 Ok(diff)
732 })
733 })
734 })??
735 .await
736 }
737
738 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
739 let diff_state = self.diffs.get(&buffer_id)?;
740 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
741 }
742
743 pub fn get_uncommitted_diff(
744 &self,
745 buffer_id: BufferId,
746 cx: &App,
747 ) -> Option<Entity<BufferDiff>> {
748 let diff_state = self.diffs.get(&buffer_id)?;
749 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
750 }
751
752 pub fn open_conflict_set(
753 &mut self,
754 buffer: Entity<Buffer>,
755 cx: &mut Context<Self>,
756 ) -> Entity<ConflictSet> {
757 log::debug!("open conflict set");
758 let buffer_id = buffer.read(cx).remote_id();
759
760 if let Some(git_state) = self.diffs.get(&buffer_id) {
761 if let Some(conflict_set) = git_state
762 .read(cx)
763 .conflict_set
764 .as_ref()
765 .and_then(|weak| weak.upgrade())
766 {
767 let conflict_set = conflict_set.clone();
768 let buffer_snapshot = buffer.read(cx).text_snapshot();
769
770 git_state.update(cx, |state, cx| {
771 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
772 });
773
774 return conflict_set;
775 }
776 }
777
778 let is_unmerged = self
779 .repository_and_path_for_buffer_id(buffer_id, cx)
780 .map_or(false, |(repo, path)| {
781 repo.read(cx).snapshot.has_conflict(&path)
782 });
783 let git_store = cx.weak_entity();
784 let buffer_git_state = self
785 .diffs
786 .entry(buffer_id)
787 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
788 let conflict_set = cx.new(|cx| ConflictSet::new(buffer_id, is_unmerged, cx));
789
790 self._subscriptions
791 .push(cx.subscribe(&conflict_set, |_, _, _, cx| {
792 cx.emit(GitStoreEvent::ConflictsUpdated);
793 }));
794
795 buffer_git_state.update(cx, |state, cx| {
796 state.conflict_set = Some(conflict_set.downgrade());
797 let buffer_snapshot = buffer.read(cx).text_snapshot();
798 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
799 });
800
801 conflict_set
802 }
803
804 pub fn project_path_git_status(
805 &self,
806 project_path: &ProjectPath,
807 cx: &App,
808 ) -> Option<FileStatus> {
809 let (repo, repo_path) = self.repository_and_path_for_project_path(project_path, cx)?;
810 Some(repo.read(cx).status_for_path(&repo_path)?.status)
811 }
812
813 pub fn checkpoint(&self, cx: &mut App) -> Task<Result<GitStoreCheckpoint>> {
814 let mut work_directory_abs_paths = Vec::new();
815 let mut checkpoints = Vec::new();
816 for repository in self.repositories.values() {
817 repository.update(cx, |repository, _| {
818 work_directory_abs_paths.push(repository.snapshot.work_directory_abs_path.clone());
819 checkpoints.push(repository.checkpoint().map(|checkpoint| checkpoint?));
820 });
821 }
822
823 cx.background_executor().spawn(async move {
824 let checkpoints = future::try_join_all(checkpoints).await?;
825 Ok(GitStoreCheckpoint {
826 checkpoints_by_work_dir_abs_path: work_directory_abs_paths
827 .into_iter()
828 .zip(checkpoints)
829 .collect(),
830 })
831 })
832 }
833
834 pub fn restore_checkpoint(
835 &self,
836 checkpoint: GitStoreCheckpoint,
837 cx: &mut App,
838 ) -> Task<Result<()>> {
839 let repositories_by_work_dir_abs_path = self
840 .repositories
841 .values()
842 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
843 .collect::<HashMap<_, _>>();
844
845 let mut tasks = Vec::new();
846 for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
847 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path) {
848 let restore = repository.update(cx, |repository, _| {
849 repository.restore_checkpoint(checkpoint)
850 });
851 tasks.push(async move { restore.await? });
852 }
853 }
854 cx.background_spawn(async move {
855 future::try_join_all(tasks).await?;
856 Ok(())
857 })
858 }
859
860 /// Compares two checkpoints, returning true if they are equal.
861 pub fn compare_checkpoints(
862 &self,
863 left: GitStoreCheckpoint,
864 mut right: GitStoreCheckpoint,
865 cx: &mut App,
866 ) -> Task<Result<bool>> {
867 let repositories_by_work_dir_abs_path = self
868 .repositories
869 .values()
870 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
871 .collect::<HashMap<_, _>>();
872
873 let mut tasks = Vec::new();
874 for (work_dir_abs_path, left_checkpoint) in left.checkpoints_by_work_dir_abs_path {
875 if let Some(right_checkpoint) = right
876 .checkpoints_by_work_dir_abs_path
877 .remove(&work_dir_abs_path)
878 {
879 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path)
880 {
881 let compare = repository.update(cx, |repository, _| {
882 repository.compare_checkpoints(left_checkpoint, right_checkpoint)
883 });
884
885 tasks.push(async move { compare.await? });
886 }
887 } else {
888 return Task::ready(Ok(false));
889 }
890 }
891 cx.background_spawn(async move {
892 Ok(future::try_join_all(tasks)
893 .await?
894 .into_iter()
895 .all(|result| result))
896 })
897 }
898
899 /// Blames a buffer.
900 pub fn blame_buffer(
901 &self,
902 buffer: &Entity<Buffer>,
903 version: Option<clock::Global>,
904 cx: &mut App,
905 ) -> Task<Result<Option<Blame>>> {
906 let buffer = buffer.read(cx);
907 let Some((repo, repo_path)) =
908 self.repository_and_path_for_buffer_id(buffer.remote_id(), cx)
909 else {
910 return Task::ready(Err(anyhow!("failed to find a git repository for buffer")));
911 };
912 let content = match &version {
913 Some(version) => buffer.rope_for_version(version).clone(),
914 None => buffer.as_rope().clone(),
915 };
916 let version = version.unwrap_or(buffer.version());
917 let buffer_id = buffer.remote_id();
918
919 let rx = repo.update(cx, |repo, _| {
920 repo.send_job(None, move |state, _| async move {
921 match state {
922 RepositoryState::Local { backend, .. } => backend
923 .blame(repo_path.clone(), content)
924 .await
925 .with_context(|| format!("Failed to blame {:?}", repo_path.0))
926 .map(Some),
927 RepositoryState::Remote { project_id, client } => {
928 let response = client
929 .request(proto::BlameBuffer {
930 project_id: project_id.to_proto(),
931 buffer_id: buffer_id.into(),
932 version: serialize_version(&version),
933 })
934 .await?;
935 Ok(deserialize_blame_buffer_response(response))
936 }
937 }
938 })
939 });
940
941 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
942 }
943
944 pub fn get_permalink_to_line(
945 &self,
946 buffer: &Entity<Buffer>,
947 selection: Range<u32>,
948 cx: &mut App,
949 ) -> Task<Result<url::Url>> {
950 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
951 return Task::ready(Err(anyhow!("buffer has no file")));
952 };
953
954 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
955 &(file.worktree.read(cx).id(), file.path.clone()).into(),
956 cx,
957 ) else {
958 // If we're not in a Git repo, check whether this is a Rust source
959 // file in the Cargo registry (presumably opened with go-to-definition
960 // from a normal Rust file). If so, we can put together a permalink
961 // using crate metadata.
962 if buffer
963 .read(cx)
964 .language()
965 .is_none_or(|lang| lang.name() != "Rust".into())
966 {
967 return Task::ready(Err(anyhow!("no permalink available")));
968 }
969 let Some(file_path) = file.worktree.read(cx).absolutize(&file.path).ok() else {
970 return Task::ready(Err(anyhow!("no permalink available")));
971 };
972 return cx.spawn(async move |cx| {
973 let provider_registry = cx.update(GitHostingProviderRegistry::default_global)?;
974 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
975 .context("no permalink available")
976 });
977
978 // TODO remote case
979 };
980
981 let buffer_id = buffer.read(cx).remote_id();
982 let branch = repo.read(cx).branch.clone();
983 let remote = branch
984 .as_ref()
985 .and_then(|b| b.upstream.as_ref())
986 .and_then(|b| b.remote_name())
987 .unwrap_or("origin")
988 .to_string();
989
990 let rx = repo.update(cx, |repo, _| {
991 repo.send_job(None, move |state, cx| async move {
992 match state {
993 RepositoryState::Local { backend, .. } => {
994 let origin_url = backend
995 .remote_url(&remote)
996 .with_context(|| format!("remote \"{remote}\" not found"))?;
997
998 let sha = backend.head_sha().await.context("reading HEAD SHA")?;
999
1000 let provider_registry =
1001 cx.update(GitHostingProviderRegistry::default_global)?;
1002
1003 let (provider, remote) =
1004 parse_git_remote_url(provider_registry, &origin_url)
1005 .context("parsing Git remote URL")?;
1006
1007 let path = repo_path.to_str().with_context(|| {
1008 format!("converting repo path {repo_path:?} to string")
1009 })?;
1010
1011 Ok(provider.build_permalink(
1012 remote,
1013 BuildPermalinkParams {
1014 sha: &sha,
1015 path,
1016 selection: Some(selection),
1017 },
1018 ))
1019 }
1020 RepositoryState::Remote { project_id, client } => {
1021 let response = client
1022 .request(proto::GetPermalinkToLine {
1023 project_id: project_id.to_proto(),
1024 buffer_id: buffer_id.into(),
1025 selection: Some(proto::Range {
1026 start: selection.start as u64,
1027 end: selection.end as u64,
1028 }),
1029 })
1030 .await?;
1031
1032 url::Url::parse(&response.permalink).context("failed to parse permalink")
1033 }
1034 }
1035 })
1036 });
1037 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1038 }
1039
1040 fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> {
1041 match &self.state {
1042 GitStoreState::Local {
1043 downstream: downstream_client,
1044 ..
1045 } => downstream_client
1046 .as_ref()
1047 .map(|state| (state.client.clone(), state.project_id)),
1048 GitStoreState::Ssh {
1049 downstream: downstream_client,
1050 ..
1051 } => downstream_client.clone(),
1052 GitStoreState::Remote { .. } => None,
1053 }
1054 }
1055
1056 fn upstream_client(&self) -> Option<AnyProtoClient> {
1057 match &self.state {
1058 GitStoreState::Local { .. } => None,
1059 GitStoreState::Ssh {
1060 upstream_client, ..
1061 }
1062 | GitStoreState::Remote {
1063 upstream_client, ..
1064 } => Some(upstream_client.clone()),
1065 }
1066 }
1067
1068 fn on_worktree_store_event(
1069 &mut self,
1070 worktree_store: Entity<WorktreeStore>,
1071 event: &WorktreeStoreEvent,
1072 cx: &mut Context<Self>,
1073 ) {
1074 let GitStoreState::Local {
1075 project_environment,
1076 downstream,
1077 next_repository_id,
1078 fs,
1079 } = &self.state
1080 else {
1081 return;
1082 };
1083
1084 match event {
1085 WorktreeStoreEvent::WorktreeUpdatedEntries(worktree_id, updated_entries) => {
1086 let mut paths_by_git_repo = HashMap::<_, Vec<_>>::default();
1087 for (relative_path, _, _) in updated_entries.iter() {
1088 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
1089 &(*worktree_id, relative_path.clone()).into(),
1090 cx,
1091 ) else {
1092 continue;
1093 };
1094 paths_by_git_repo.entry(repo).or_default().push(repo_path)
1095 }
1096
1097 for (repo, paths) in paths_by_git_repo {
1098 repo.update(cx, |repo, cx| {
1099 repo.paths_changed(
1100 paths,
1101 downstream
1102 .as_ref()
1103 .map(|downstream| downstream.updates_tx.clone()),
1104 cx,
1105 );
1106 });
1107 }
1108 }
1109 WorktreeStoreEvent::WorktreeUpdatedGitRepositories(worktree_id, changed_repos) => {
1110 let Some(worktree) = worktree_store.read(cx).worktree_for_id(*worktree_id, cx)
1111 else {
1112 return;
1113 };
1114 if !worktree.read(cx).is_visible() {
1115 log::debug!(
1116 "not adding repositories for local worktree {:?} because it's not visible",
1117 worktree.read(cx).abs_path()
1118 );
1119 return;
1120 }
1121 self.update_repositories_from_worktree(
1122 project_environment.clone(),
1123 next_repository_id.clone(),
1124 downstream
1125 .as_ref()
1126 .map(|downstream| downstream.updates_tx.clone()),
1127 changed_repos.clone(),
1128 fs.clone(),
1129 cx,
1130 );
1131 self.local_worktree_git_repos_changed(worktree, changed_repos, cx);
1132 }
1133 _ => {}
1134 }
1135 }
1136
1137 fn on_repository_event(
1138 &mut self,
1139 repo: Entity<Repository>,
1140 event: &RepositoryEvent,
1141 cx: &mut Context<Self>,
1142 ) {
1143 let id = repo.read(cx).id;
1144 let repo_snapshot = repo.read(cx).snapshot.clone();
1145 for (buffer_id, diff) in self.diffs.iter() {
1146 if let Some((buffer_repo, repo_path)) =
1147 self.repository_and_path_for_buffer_id(*buffer_id, cx)
1148 {
1149 if buffer_repo == repo {
1150 diff.update(cx, |diff, cx| {
1151 if let Some(conflict_set) = &diff.conflict_set {
1152 let conflict_status_changed =
1153 conflict_set.update(cx, |conflict_set, cx| {
1154 let has_conflict = repo_snapshot.has_conflict(&repo_path);
1155 conflict_set.set_has_conflict(has_conflict, cx)
1156 })?;
1157 if conflict_status_changed {
1158 let buffer_store = self.buffer_store.read(cx);
1159 if let Some(buffer) = buffer_store.get(*buffer_id) {
1160 let _ = diff.reparse_conflict_markers(
1161 buffer.read(cx).text_snapshot(),
1162 cx,
1163 );
1164 }
1165 }
1166 }
1167 anyhow::Ok(())
1168 })
1169 .ok();
1170 }
1171 }
1172 }
1173 cx.emit(GitStoreEvent::RepositoryUpdated(
1174 id,
1175 event.clone(),
1176 self.active_repo_id == Some(id),
1177 ))
1178 }
1179
1180 fn on_jobs_updated(&mut self, _: Entity<Repository>, _: &JobsUpdated, cx: &mut Context<Self>) {
1181 cx.emit(GitStoreEvent::JobsUpdated)
1182 }
1183
1184 /// Update our list of repositories and schedule git scans in response to a notification from a worktree,
1185 fn update_repositories_from_worktree(
1186 &mut self,
1187 project_environment: Entity<ProjectEnvironment>,
1188 next_repository_id: Arc<AtomicU64>,
1189 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
1190 updated_git_repositories: UpdatedGitRepositoriesSet,
1191 fs: Arc<dyn Fs>,
1192 cx: &mut Context<Self>,
1193 ) {
1194 let mut removed_ids = Vec::new();
1195 for update in updated_git_repositories.iter() {
1196 if let Some((id, existing)) = self.repositories.iter().find(|(_, repo)| {
1197 let existing_work_directory_abs_path =
1198 repo.read(cx).work_directory_abs_path.clone();
1199 Some(&existing_work_directory_abs_path)
1200 == update.old_work_directory_abs_path.as_ref()
1201 || Some(&existing_work_directory_abs_path)
1202 == update.new_work_directory_abs_path.as_ref()
1203 }) {
1204 if let Some(new_work_directory_abs_path) =
1205 update.new_work_directory_abs_path.clone()
1206 {
1207 existing.update(cx, |existing, cx| {
1208 existing.snapshot.work_directory_abs_path = new_work_directory_abs_path;
1209 existing.schedule_scan(updates_tx.clone(), cx);
1210 });
1211 } else {
1212 removed_ids.push(*id);
1213 }
1214 } else if let UpdatedGitRepository {
1215 new_work_directory_abs_path: Some(work_directory_abs_path),
1216 dot_git_abs_path: Some(dot_git_abs_path),
1217 repository_dir_abs_path: Some(repository_dir_abs_path),
1218 common_dir_abs_path: Some(common_dir_abs_path),
1219 ..
1220 } = update
1221 {
1222 let id = RepositoryId(next_repository_id.fetch_add(1, atomic::Ordering::Release));
1223 let git_store = cx.weak_entity();
1224 let repo = cx.new(|cx| {
1225 let mut repo = Repository::local(
1226 id,
1227 work_directory_abs_path.clone(),
1228 dot_git_abs_path.clone(),
1229 repository_dir_abs_path.clone(),
1230 common_dir_abs_path.clone(),
1231 project_environment.downgrade(),
1232 fs.clone(),
1233 git_store,
1234 cx,
1235 );
1236 repo.schedule_scan(updates_tx.clone(), cx);
1237 repo
1238 });
1239 self._subscriptions
1240 .push(cx.subscribe(&repo, Self::on_repository_event));
1241 self._subscriptions
1242 .push(cx.subscribe(&repo, Self::on_jobs_updated));
1243 self.repositories.insert(id, repo);
1244 cx.emit(GitStoreEvent::RepositoryAdded(id));
1245 self.active_repo_id.get_or_insert_with(|| {
1246 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1247 id
1248 });
1249 }
1250 }
1251
1252 for id in removed_ids {
1253 if self.active_repo_id == Some(id) {
1254 self.active_repo_id = None;
1255 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1256 }
1257 self.repositories.remove(&id);
1258 if let Some(updates_tx) = updates_tx.as_ref() {
1259 updates_tx
1260 .unbounded_send(DownstreamUpdate::RemoveRepository(id))
1261 .ok();
1262 }
1263 }
1264 }
1265
1266 fn on_buffer_store_event(
1267 &mut self,
1268 _: Entity<BufferStore>,
1269 event: &BufferStoreEvent,
1270 cx: &mut Context<Self>,
1271 ) {
1272 match event {
1273 BufferStoreEvent::BufferAdded(buffer) => {
1274 cx.subscribe(&buffer, |this, buffer, event, cx| {
1275 if let BufferEvent::LanguageChanged = event {
1276 let buffer_id = buffer.read(cx).remote_id();
1277 if let Some(diff_state) = this.diffs.get(&buffer_id) {
1278 diff_state.update(cx, |diff_state, cx| {
1279 diff_state.buffer_language_changed(buffer, cx);
1280 });
1281 }
1282 }
1283 })
1284 .detach();
1285 }
1286 BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => {
1287 if let Some(diffs) = self.shared_diffs.get_mut(peer_id) {
1288 diffs.remove(buffer_id);
1289 }
1290 }
1291 BufferStoreEvent::BufferDropped(buffer_id) => {
1292 self.diffs.remove(&buffer_id);
1293 for diffs in self.shared_diffs.values_mut() {
1294 diffs.remove(buffer_id);
1295 }
1296 }
1297
1298 _ => {}
1299 }
1300 }
1301
1302 pub fn recalculate_buffer_diffs(
1303 &mut self,
1304 buffers: Vec<Entity<Buffer>>,
1305 cx: &mut Context<Self>,
1306 ) -> impl Future<Output = ()> + use<> {
1307 let mut futures = Vec::new();
1308 for buffer in buffers {
1309 if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
1310 let buffer = buffer.read(cx).text_snapshot();
1311 diff_state.update(cx, |diff_state, cx| {
1312 diff_state.recalculate_diffs(buffer.clone(), cx);
1313 futures.extend(diff_state.wait_for_recalculation().map(FutureExt::boxed));
1314 });
1315 futures.push(diff_state.update(cx, |diff_state, cx| {
1316 diff_state
1317 .reparse_conflict_markers(buffer, cx)
1318 .map(|_| {})
1319 .boxed()
1320 }));
1321 }
1322 }
1323 async move {
1324 futures::future::join_all(futures).await;
1325 }
1326 }
1327
1328 fn on_buffer_diff_event(
1329 &mut self,
1330 diff: Entity<buffer_diff::BufferDiff>,
1331 event: &BufferDiffEvent,
1332 cx: &mut Context<Self>,
1333 ) {
1334 if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
1335 let buffer_id = diff.read(cx).buffer_id;
1336 if let Some(diff_state) = self.diffs.get(&buffer_id) {
1337 let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
1338 diff_state.hunk_staging_operation_count += 1;
1339 diff_state.hunk_staging_operation_count
1340 });
1341 if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
1342 let recv = repo.update(cx, |repo, cx| {
1343 log::debug!("hunks changed for {}", path.display());
1344 repo.spawn_set_index_text_job(
1345 path,
1346 new_index_text.as_ref().map(|rope| rope.to_string()),
1347 Some(hunk_staging_operation_count),
1348 cx,
1349 )
1350 });
1351 let diff = diff.downgrade();
1352 cx.spawn(async move |this, cx| {
1353 if let Ok(Err(error)) = cx.background_spawn(recv).await {
1354 diff.update(cx, |diff, cx| {
1355 diff.clear_pending_hunks(cx);
1356 })
1357 .ok();
1358 this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
1359 .ok();
1360 }
1361 })
1362 .detach();
1363 }
1364 }
1365 }
1366 }
1367
1368 fn local_worktree_git_repos_changed(
1369 &mut self,
1370 worktree: Entity<Worktree>,
1371 changed_repos: &UpdatedGitRepositoriesSet,
1372 cx: &mut Context<Self>,
1373 ) {
1374 log::debug!("local worktree repos changed");
1375 debug_assert!(worktree.read(cx).is_local());
1376
1377 for repository in self.repositories.values() {
1378 repository.update(cx, |repository, cx| {
1379 let repo_abs_path = &repository.work_directory_abs_path;
1380 if changed_repos.iter().any(|update| {
1381 update.old_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1382 || update.new_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1383 }) {
1384 repository.reload_buffer_diff_bases(cx);
1385 }
1386 });
1387 }
1388 }
1389
1390 pub fn repositories(&self) -> &HashMap<RepositoryId, Entity<Repository>> {
1391 &self.repositories
1392 }
1393
1394 pub fn status_for_buffer_id(&self, buffer_id: BufferId, cx: &App) -> Option<FileStatus> {
1395 let (repo, path) = self.repository_and_path_for_buffer_id(buffer_id, cx)?;
1396 let status = repo.read(cx).snapshot.status_for_path(&path)?;
1397 Some(status.status)
1398 }
1399
1400 pub fn repository_and_path_for_buffer_id(
1401 &self,
1402 buffer_id: BufferId,
1403 cx: &App,
1404 ) -> Option<(Entity<Repository>, RepoPath)> {
1405 let buffer = self.buffer_store.read(cx).get(buffer_id)?;
1406 let project_path = buffer.read(cx).project_path(cx)?;
1407 self.repository_and_path_for_project_path(&project_path, cx)
1408 }
1409
1410 pub fn repository_and_path_for_project_path(
1411 &self,
1412 path: &ProjectPath,
1413 cx: &App,
1414 ) -> Option<(Entity<Repository>, RepoPath)> {
1415 let abs_path = self.worktree_store.read(cx).absolutize(path, cx)?;
1416 self.repositories
1417 .values()
1418 .filter_map(|repo| {
1419 let repo_path = repo.read(cx).abs_path_to_repo_path(&abs_path)?;
1420 Some((repo.clone(), repo_path))
1421 })
1422 .max_by_key(|(repo, _)| repo.read(cx).work_directory_abs_path.clone())
1423 }
1424
1425 pub fn git_init(
1426 &self,
1427 path: Arc<Path>,
1428 fallback_branch_name: String,
1429 cx: &App,
1430 ) -> Task<Result<()>> {
1431 match &self.state {
1432 GitStoreState::Local { fs, .. } => {
1433 let fs = fs.clone();
1434 cx.background_executor()
1435 .spawn(async move { fs.git_init(&path, fallback_branch_name) })
1436 }
1437 GitStoreState::Ssh {
1438 upstream_client,
1439 upstream_project_id: project_id,
1440 ..
1441 }
1442 | GitStoreState::Remote {
1443 upstream_client,
1444 upstream_project_id: project_id,
1445 ..
1446 } => {
1447 let client = upstream_client.clone();
1448 let project_id = *project_id;
1449 cx.background_executor().spawn(async move {
1450 client
1451 .request(proto::GitInit {
1452 project_id: project_id.0,
1453 abs_path: path.to_string_lossy().to_string(),
1454 fallback_branch_name,
1455 })
1456 .await?;
1457 Ok(())
1458 })
1459 }
1460 }
1461 }
1462
1463 async fn handle_update_repository(
1464 this: Entity<Self>,
1465 envelope: TypedEnvelope<proto::UpdateRepository>,
1466 mut cx: AsyncApp,
1467 ) -> Result<()> {
1468 this.update(&mut cx, |this, cx| {
1469 let mut update = envelope.payload;
1470
1471 let id = RepositoryId::from_proto(update.id);
1472 let client = this
1473 .upstream_client()
1474 .context("no upstream client")?
1475 .clone();
1476
1477 let mut is_new = false;
1478 let repo = this.repositories.entry(id).or_insert_with(|| {
1479 is_new = true;
1480 let git_store = cx.weak_entity();
1481 cx.new(|cx| {
1482 Repository::remote(
1483 id,
1484 Path::new(&update.abs_path).into(),
1485 ProjectId(update.project_id),
1486 client,
1487 git_store,
1488 cx,
1489 )
1490 })
1491 });
1492 if is_new {
1493 this._subscriptions
1494 .push(cx.subscribe(&repo, Self::on_repository_event))
1495 }
1496
1497 repo.update(cx, {
1498 let update = update.clone();
1499 |repo, cx| repo.apply_remote_update(update, is_new, cx)
1500 })?;
1501
1502 this.active_repo_id.get_or_insert_with(|| {
1503 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1504 id
1505 });
1506
1507 if let Some((client, project_id)) = this.downstream_client() {
1508 update.project_id = project_id.to_proto();
1509 client.send(update).log_err();
1510 }
1511 Ok(())
1512 })?
1513 }
1514
1515 async fn handle_remove_repository(
1516 this: Entity<Self>,
1517 envelope: TypedEnvelope<proto::RemoveRepository>,
1518 mut cx: AsyncApp,
1519 ) -> Result<()> {
1520 this.update(&mut cx, |this, cx| {
1521 let mut update = envelope.payload;
1522 let id = RepositoryId::from_proto(update.id);
1523 this.repositories.remove(&id);
1524 if let Some((client, project_id)) = this.downstream_client() {
1525 update.project_id = project_id.to_proto();
1526 client.send(update).log_err();
1527 }
1528 if this.active_repo_id == Some(id) {
1529 this.active_repo_id = None;
1530 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1531 }
1532 cx.emit(GitStoreEvent::RepositoryRemoved(id));
1533 })
1534 }
1535
1536 async fn handle_git_init(
1537 this: Entity<Self>,
1538 envelope: TypedEnvelope<proto::GitInit>,
1539 cx: AsyncApp,
1540 ) -> Result<proto::Ack> {
1541 let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1542 let name = envelope.payload.fallback_branch_name;
1543 cx.update(|cx| this.read(cx).git_init(path, name, cx))?
1544 .await?;
1545
1546 Ok(proto::Ack {})
1547 }
1548
1549 async fn handle_fetch(
1550 this: Entity<Self>,
1551 envelope: TypedEnvelope<proto::Fetch>,
1552 mut cx: AsyncApp,
1553 ) -> Result<proto::RemoteMessageResponse> {
1554 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1555 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1556 let fetch_options = FetchOptions::from_proto(envelope.payload.remote);
1557 let askpass_id = envelope.payload.askpass_id;
1558
1559 let askpass = make_remote_delegate(
1560 this,
1561 envelope.payload.project_id,
1562 repository_id,
1563 askpass_id,
1564 &mut cx,
1565 );
1566
1567 let remote_output = repository_handle
1568 .update(&mut cx, |repository_handle, cx| {
1569 repository_handle.fetch(fetch_options, askpass, cx)
1570 })?
1571 .await??;
1572
1573 Ok(proto::RemoteMessageResponse {
1574 stdout: remote_output.stdout,
1575 stderr: remote_output.stderr,
1576 })
1577 }
1578
1579 async fn handle_push(
1580 this: Entity<Self>,
1581 envelope: TypedEnvelope<proto::Push>,
1582 mut cx: AsyncApp,
1583 ) -> Result<proto::RemoteMessageResponse> {
1584 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1585 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1586
1587 let askpass_id = envelope.payload.askpass_id;
1588 let askpass = make_remote_delegate(
1589 this,
1590 envelope.payload.project_id,
1591 repository_id,
1592 askpass_id,
1593 &mut cx,
1594 );
1595
1596 let options = envelope
1597 .payload
1598 .options
1599 .as_ref()
1600 .map(|_| match envelope.payload.options() {
1601 proto::push::PushOptions::SetUpstream => git::repository::PushOptions::SetUpstream,
1602 proto::push::PushOptions::Force => git::repository::PushOptions::Force,
1603 });
1604
1605 let branch_name = envelope.payload.branch_name.into();
1606 let remote_name = envelope.payload.remote_name.into();
1607
1608 let remote_output = repository_handle
1609 .update(&mut cx, |repository_handle, cx| {
1610 repository_handle.push(branch_name, remote_name, options, askpass, cx)
1611 })?
1612 .await??;
1613 Ok(proto::RemoteMessageResponse {
1614 stdout: remote_output.stdout,
1615 stderr: remote_output.stderr,
1616 })
1617 }
1618
1619 async fn handle_pull(
1620 this: Entity<Self>,
1621 envelope: TypedEnvelope<proto::Pull>,
1622 mut cx: AsyncApp,
1623 ) -> Result<proto::RemoteMessageResponse> {
1624 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1625 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1626 let askpass_id = envelope.payload.askpass_id;
1627 let askpass = make_remote_delegate(
1628 this,
1629 envelope.payload.project_id,
1630 repository_id,
1631 askpass_id,
1632 &mut cx,
1633 );
1634
1635 let branch_name = envelope.payload.branch_name.into();
1636 let remote_name = envelope.payload.remote_name.into();
1637
1638 let remote_message = repository_handle
1639 .update(&mut cx, |repository_handle, cx| {
1640 repository_handle.pull(branch_name, remote_name, askpass, cx)
1641 })?
1642 .await??;
1643
1644 Ok(proto::RemoteMessageResponse {
1645 stdout: remote_message.stdout,
1646 stderr: remote_message.stderr,
1647 })
1648 }
1649
1650 async fn handle_stage(
1651 this: Entity<Self>,
1652 envelope: TypedEnvelope<proto::Stage>,
1653 mut cx: AsyncApp,
1654 ) -> Result<proto::Ack> {
1655 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1656 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1657
1658 let entries = envelope
1659 .payload
1660 .paths
1661 .into_iter()
1662 .map(PathBuf::from)
1663 .map(RepoPath::new)
1664 .collect();
1665
1666 repository_handle
1667 .update(&mut cx, |repository_handle, cx| {
1668 repository_handle.stage_entries(entries, cx)
1669 })?
1670 .await?;
1671 Ok(proto::Ack {})
1672 }
1673
1674 async fn handle_unstage(
1675 this: Entity<Self>,
1676 envelope: TypedEnvelope<proto::Unstage>,
1677 mut cx: AsyncApp,
1678 ) -> Result<proto::Ack> {
1679 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1680 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1681
1682 let entries = envelope
1683 .payload
1684 .paths
1685 .into_iter()
1686 .map(PathBuf::from)
1687 .map(RepoPath::new)
1688 .collect();
1689
1690 repository_handle
1691 .update(&mut cx, |repository_handle, cx| {
1692 repository_handle.unstage_entries(entries, cx)
1693 })?
1694 .await?;
1695
1696 Ok(proto::Ack {})
1697 }
1698
1699 async fn handle_set_index_text(
1700 this: Entity<Self>,
1701 envelope: TypedEnvelope<proto::SetIndexText>,
1702 mut cx: AsyncApp,
1703 ) -> Result<proto::Ack> {
1704 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1705 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1706 let repo_path = RepoPath::from_str(&envelope.payload.path);
1707
1708 repository_handle
1709 .update(&mut cx, |repository_handle, cx| {
1710 repository_handle.spawn_set_index_text_job(
1711 repo_path,
1712 envelope.payload.text,
1713 None,
1714 cx,
1715 )
1716 })?
1717 .await??;
1718 Ok(proto::Ack {})
1719 }
1720
1721 async fn handle_commit(
1722 this: Entity<Self>,
1723 envelope: TypedEnvelope<proto::Commit>,
1724 mut cx: AsyncApp,
1725 ) -> Result<proto::Ack> {
1726 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1727 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1728
1729 let message = SharedString::from(envelope.payload.message);
1730 let name = envelope.payload.name.map(SharedString::from);
1731 let email = envelope.payload.email.map(SharedString::from);
1732 let options = envelope.payload.options.unwrap_or_default();
1733
1734 repository_handle
1735 .update(&mut cx, |repository_handle, cx| {
1736 repository_handle.commit(
1737 message,
1738 name.zip(email),
1739 CommitOptions {
1740 amend: options.amend,
1741 signoff: options.signoff,
1742 },
1743 cx,
1744 )
1745 })?
1746 .await??;
1747 Ok(proto::Ack {})
1748 }
1749
1750 async fn handle_get_remotes(
1751 this: Entity<Self>,
1752 envelope: TypedEnvelope<proto::GetRemotes>,
1753 mut cx: AsyncApp,
1754 ) -> Result<proto::GetRemotesResponse> {
1755 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1756 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1757
1758 let branch_name = envelope.payload.branch_name;
1759
1760 let remotes = repository_handle
1761 .update(&mut cx, |repository_handle, _| {
1762 repository_handle.get_remotes(branch_name)
1763 })?
1764 .await??;
1765
1766 Ok(proto::GetRemotesResponse {
1767 remotes: remotes
1768 .into_iter()
1769 .map(|remotes| proto::get_remotes_response::Remote {
1770 name: remotes.name.to_string(),
1771 })
1772 .collect::<Vec<_>>(),
1773 })
1774 }
1775
1776 async fn handle_get_branches(
1777 this: Entity<Self>,
1778 envelope: TypedEnvelope<proto::GitGetBranches>,
1779 mut cx: AsyncApp,
1780 ) -> Result<proto::GitBranchesResponse> {
1781 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1782 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1783
1784 let branches = repository_handle
1785 .update(&mut cx, |repository_handle, _| repository_handle.branches())?
1786 .await??;
1787
1788 Ok(proto::GitBranchesResponse {
1789 branches: branches
1790 .into_iter()
1791 .map(|branch| branch_to_proto(&branch))
1792 .collect::<Vec<_>>(),
1793 })
1794 }
1795 async fn handle_create_branch(
1796 this: Entity<Self>,
1797 envelope: TypedEnvelope<proto::GitCreateBranch>,
1798 mut cx: AsyncApp,
1799 ) -> Result<proto::Ack> {
1800 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1801 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1802 let branch_name = envelope.payload.branch_name;
1803
1804 repository_handle
1805 .update(&mut cx, |repository_handle, _| {
1806 repository_handle.create_branch(branch_name)
1807 })?
1808 .await??;
1809
1810 Ok(proto::Ack {})
1811 }
1812
1813 async fn handle_change_branch(
1814 this: Entity<Self>,
1815 envelope: TypedEnvelope<proto::GitChangeBranch>,
1816 mut cx: AsyncApp,
1817 ) -> Result<proto::Ack> {
1818 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1819 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1820 let branch_name = envelope.payload.branch_name;
1821
1822 repository_handle
1823 .update(&mut cx, |repository_handle, _| {
1824 repository_handle.change_branch(branch_name)
1825 })?
1826 .await??;
1827
1828 Ok(proto::Ack {})
1829 }
1830
1831 async fn handle_show(
1832 this: Entity<Self>,
1833 envelope: TypedEnvelope<proto::GitShow>,
1834 mut cx: AsyncApp,
1835 ) -> Result<proto::GitCommitDetails> {
1836 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1837 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1838
1839 let commit = repository_handle
1840 .update(&mut cx, |repository_handle, _| {
1841 repository_handle.show(envelope.payload.commit)
1842 })?
1843 .await??;
1844 Ok(proto::GitCommitDetails {
1845 sha: commit.sha.into(),
1846 message: commit.message.into(),
1847 commit_timestamp: commit.commit_timestamp,
1848 author_email: commit.author_email.into(),
1849 author_name: commit.author_name.into(),
1850 })
1851 }
1852
1853 async fn handle_load_commit_diff(
1854 this: Entity<Self>,
1855 envelope: TypedEnvelope<proto::LoadCommitDiff>,
1856 mut cx: AsyncApp,
1857 ) -> Result<proto::LoadCommitDiffResponse> {
1858 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1859 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1860
1861 let commit_diff = repository_handle
1862 .update(&mut cx, |repository_handle, _| {
1863 repository_handle.load_commit_diff(envelope.payload.commit)
1864 })?
1865 .await??;
1866 Ok(proto::LoadCommitDiffResponse {
1867 files: commit_diff
1868 .files
1869 .into_iter()
1870 .map(|file| proto::CommitFile {
1871 path: file.path.to_string(),
1872 old_text: file.old_text,
1873 new_text: file.new_text,
1874 })
1875 .collect(),
1876 })
1877 }
1878
1879 async fn handle_reset(
1880 this: Entity<Self>,
1881 envelope: TypedEnvelope<proto::GitReset>,
1882 mut cx: AsyncApp,
1883 ) -> Result<proto::Ack> {
1884 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1885 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1886
1887 let mode = match envelope.payload.mode() {
1888 git_reset::ResetMode::Soft => ResetMode::Soft,
1889 git_reset::ResetMode::Mixed => ResetMode::Mixed,
1890 };
1891
1892 repository_handle
1893 .update(&mut cx, |repository_handle, cx| {
1894 repository_handle.reset(envelope.payload.commit, mode, cx)
1895 })?
1896 .await??;
1897 Ok(proto::Ack {})
1898 }
1899
1900 async fn handle_checkout_files(
1901 this: Entity<Self>,
1902 envelope: TypedEnvelope<proto::GitCheckoutFiles>,
1903 mut cx: AsyncApp,
1904 ) -> Result<proto::Ack> {
1905 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1906 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1907 let paths = envelope
1908 .payload
1909 .paths
1910 .iter()
1911 .map(|s| RepoPath::from_str(s))
1912 .collect();
1913
1914 repository_handle
1915 .update(&mut cx, |repository_handle, cx| {
1916 repository_handle.checkout_files(&envelope.payload.commit, paths, cx)
1917 })?
1918 .await??;
1919 Ok(proto::Ack {})
1920 }
1921
1922 async fn handle_open_commit_message_buffer(
1923 this: Entity<Self>,
1924 envelope: TypedEnvelope<proto::OpenCommitMessageBuffer>,
1925 mut cx: AsyncApp,
1926 ) -> Result<proto::OpenBufferResponse> {
1927 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1928 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1929 let buffer = repository
1930 .update(&mut cx, |repository, cx| {
1931 repository.open_commit_buffer(None, this.read(cx).buffer_store.clone(), cx)
1932 })?
1933 .await?;
1934
1935 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id())?;
1936 this.update(&mut cx, |this, cx| {
1937 this.buffer_store.update(cx, |buffer_store, cx| {
1938 buffer_store
1939 .create_buffer_for_peer(
1940 &buffer,
1941 envelope.original_sender_id.unwrap_or(envelope.sender_id),
1942 cx,
1943 )
1944 .detach_and_log_err(cx);
1945 })
1946 })?;
1947
1948 Ok(proto::OpenBufferResponse {
1949 buffer_id: buffer_id.to_proto(),
1950 })
1951 }
1952
1953 async fn handle_askpass(
1954 this: Entity<Self>,
1955 envelope: TypedEnvelope<proto::AskPassRequest>,
1956 mut cx: AsyncApp,
1957 ) -> Result<proto::AskPassResponse> {
1958 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1959 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1960
1961 let delegates = cx.update(|cx| repository.read(cx).askpass_delegates.clone())?;
1962 let Some(mut askpass) = delegates.lock().remove(&envelope.payload.askpass_id) else {
1963 debug_panic!("no askpass found");
1964 anyhow::bail!("no askpass found");
1965 };
1966
1967 let response = askpass.ask_password(envelope.payload.prompt).await?;
1968
1969 delegates
1970 .lock()
1971 .insert(envelope.payload.askpass_id, askpass);
1972
1973 Ok(proto::AskPassResponse { response })
1974 }
1975
1976 async fn handle_check_for_pushed_commits(
1977 this: Entity<Self>,
1978 envelope: TypedEnvelope<proto::CheckForPushedCommits>,
1979 mut cx: AsyncApp,
1980 ) -> Result<proto::CheckForPushedCommitsResponse> {
1981 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1982 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1983
1984 let branches = repository_handle
1985 .update(&mut cx, |repository_handle, _| {
1986 repository_handle.check_for_pushed_commits()
1987 })?
1988 .await??;
1989 Ok(proto::CheckForPushedCommitsResponse {
1990 pushed_to: branches
1991 .into_iter()
1992 .map(|commit| commit.to_string())
1993 .collect(),
1994 })
1995 }
1996
1997 async fn handle_git_diff(
1998 this: Entity<Self>,
1999 envelope: TypedEnvelope<proto::GitDiff>,
2000 mut cx: AsyncApp,
2001 ) -> Result<proto::GitDiffResponse> {
2002 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2003 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2004 let diff_type = match envelope.payload.diff_type() {
2005 proto::git_diff::DiffType::HeadToIndex => DiffType::HeadToIndex,
2006 proto::git_diff::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
2007 };
2008
2009 let mut diff = repository_handle
2010 .update(&mut cx, |repository_handle, cx| {
2011 repository_handle.diff(diff_type, cx)
2012 })?
2013 .await??;
2014 const ONE_MB: usize = 1_000_000;
2015 if diff.len() > ONE_MB {
2016 diff = diff.chars().take(ONE_MB).collect()
2017 }
2018
2019 Ok(proto::GitDiffResponse { diff })
2020 }
2021
2022 async fn handle_open_unstaged_diff(
2023 this: Entity<Self>,
2024 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2025 mut cx: AsyncApp,
2026 ) -> Result<proto::OpenUnstagedDiffResponse> {
2027 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2028 let diff = this
2029 .update(&mut cx, |this, cx| {
2030 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2031 Some(this.open_unstaged_diff(buffer, cx))
2032 })?
2033 .context("missing buffer")?
2034 .await?;
2035 this.update(&mut cx, |this, _| {
2036 let shared_diffs = this
2037 .shared_diffs
2038 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2039 .or_default();
2040 shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone());
2041 })?;
2042 let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?;
2043 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2044 }
2045
2046 async fn handle_open_uncommitted_diff(
2047 this: Entity<Self>,
2048 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2049 mut cx: AsyncApp,
2050 ) -> Result<proto::OpenUncommittedDiffResponse> {
2051 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2052 let diff = this
2053 .update(&mut cx, |this, cx| {
2054 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2055 Some(this.open_uncommitted_diff(buffer, cx))
2056 })?
2057 .context("missing buffer")?
2058 .await?;
2059 this.update(&mut cx, |this, _| {
2060 let shared_diffs = this
2061 .shared_diffs
2062 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2063 .or_default();
2064 shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone());
2065 })?;
2066 diff.read_with(&cx, |diff, cx| {
2067 use proto::open_uncommitted_diff_response::Mode;
2068
2069 let unstaged_diff = diff.secondary_diff();
2070 let index_snapshot = unstaged_diff.and_then(|diff| {
2071 let diff = diff.read(cx);
2072 diff.base_text_exists().then(|| diff.base_text())
2073 });
2074
2075 let mode;
2076 let staged_text;
2077 let committed_text;
2078 if diff.base_text_exists() {
2079 let committed_snapshot = diff.base_text();
2080 committed_text = Some(committed_snapshot.text());
2081 if let Some(index_text) = index_snapshot {
2082 if index_text.remote_id() == committed_snapshot.remote_id() {
2083 mode = Mode::IndexMatchesHead;
2084 staged_text = None;
2085 } else {
2086 mode = Mode::IndexAndHead;
2087 staged_text = Some(index_text.text());
2088 }
2089 } else {
2090 mode = Mode::IndexAndHead;
2091 staged_text = None;
2092 }
2093 } else {
2094 mode = Mode::IndexAndHead;
2095 committed_text = None;
2096 staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2097 }
2098
2099 proto::OpenUncommittedDiffResponse {
2100 committed_text,
2101 staged_text,
2102 mode: mode.into(),
2103 }
2104 })
2105 }
2106
2107 async fn handle_update_diff_bases(
2108 this: Entity<Self>,
2109 request: TypedEnvelope<proto::UpdateDiffBases>,
2110 mut cx: AsyncApp,
2111 ) -> Result<()> {
2112 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2113 this.update(&mut cx, |this, cx| {
2114 if let Some(diff_state) = this.diffs.get_mut(&buffer_id) {
2115 if let Some(buffer) = this.buffer_store.read(cx).get(buffer_id) {
2116 let buffer = buffer.read(cx).text_snapshot();
2117 diff_state.update(cx, |diff_state, cx| {
2118 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2119 })
2120 }
2121 }
2122 })
2123 }
2124
2125 async fn handle_blame_buffer(
2126 this: Entity<Self>,
2127 envelope: TypedEnvelope<proto::BlameBuffer>,
2128 mut cx: AsyncApp,
2129 ) -> Result<proto::BlameBufferResponse> {
2130 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2131 let version = deserialize_version(&envelope.payload.version);
2132 let buffer = this.read_with(&cx, |this, cx| {
2133 this.buffer_store.read(cx).get_existing(buffer_id)
2134 })??;
2135 buffer
2136 .update(&mut cx, |buffer, _| {
2137 buffer.wait_for_version(version.clone())
2138 })?
2139 .await?;
2140 let blame = this
2141 .update(&mut cx, |this, cx| {
2142 this.blame_buffer(&buffer, Some(version), cx)
2143 })?
2144 .await?;
2145 Ok(serialize_blame_buffer_response(blame))
2146 }
2147
2148 async fn handle_get_permalink_to_line(
2149 this: Entity<Self>,
2150 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2151 mut cx: AsyncApp,
2152 ) -> Result<proto::GetPermalinkToLineResponse> {
2153 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2154 // let version = deserialize_version(&envelope.payload.version);
2155 let selection = {
2156 let proto_selection = envelope
2157 .payload
2158 .selection
2159 .context("no selection to get permalink for defined")?;
2160 proto_selection.start as u32..proto_selection.end as u32
2161 };
2162 let buffer = this.read_with(&cx, |this, cx| {
2163 this.buffer_store.read(cx).get_existing(buffer_id)
2164 })??;
2165 let permalink = this
2166 .update(&mut cx, |this, cx| {
2167 this.get_permalink_to_line(&buffer, selection, cx)
2168 })?
2169 .await?;
2170 Ok(proto::GetPermalinkToLineResponse {
2171 permalink: permalink.to_string(),
2172 })
2173 }
2174
2175 fn repository_for_request(
2176 this: &Entity<Self>,
2177 id: RepositoryId,
2178 cx: &mut AsyncApp,
2179 ) -> Result<Entity<Repository>> {
2180 this.read_with(cx, |this, _| {
2181 this.repositories
2182 .get(&id)
2183 .context("missing repository handle")
2184 .cloned()
2185 })?
2186 }
2187
2188 pub fn repo_snapshots(&self, cx: &App) -> HashMap<RepositoryId, RepositorySnapshot> {
2189 self.repositories
2190 .iter()
2191 .map(|(id, repo)| (*id, repo.read(cx).snapshot.clone()))
2192 .collect()
2193 }
2194}
2195
2196impl BufferGitState {
2197 fn new(_git_store: WeakEntity<GitStore>) -> Self {
2198 Self {
2199 unstaged_diff: Default::default(),
2200 uncommitted_diff: Default::default(),
2201 recalculate_diff_task: Default::default(),
2202 language: Default::default(),
2203 language_registry: Default::default(),
2204 recalculating_tx: postage::watch::channel_with(false).0,
2205 hunk_staging_operation_count: 0,
2206 hunk_staging_operation_count_as_of_write: 0,
2207 head_text: Default::default(),
2208 index_text: Default::default(),
2209 head_changed: Default::default(),
2210 index_changed: Default::default(),
2211 language_changed: Default::default(),
2212 conflict_updated_futures: Default::default(),
2213 conflict_set: Default::default(),
2214 reparse_conflict_markers_task: Default::default(),
2215 }
2216 }
2217
2218 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
2219 self.language = buffer.read(cx).language().cloned();
2220 self.language_changed = true;
2221 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
2222 }
2223
2224 fn reparse_conflict_markers(
2225 &mut self,
2226 buffer: text::BufferSnapshot,
2227 cx: &mut Context<Self>,
2228 ) -> oneshot::Receiver<()> {
2229 let (tx, rx) = oneshot::channel();
2230
2231 let Some(conflict_set) = self
2232 .conflict_set
2233 .as_ref()
2234 .and_then(|conflict_set| conflict_set.upgrade())
2235 else {
2236 return rx;
2237 };
2238
2239 let old_snapshot = conflict_set.read_with(cx, |conflict_set, _| {
2240 if conflict_set.has_conflict {
2241 Some(conflict_set.snapshot())
2242 } else {
2243 None
2244 }
2245 });
2246
2247 if let Some(old_snapshot) = old_snapshot {
2248 self.conflict_updated_futures.push(tx);
2249 self.reparse_conflict_markers_task = Some(cx.spawn(async move |this, cx| {
2250 let (snapshot, changed_range) = cx
2251 .background_spawn(async move {
2252 let new_snapshot = ConflictSet::parse(&buffer);
2253 let changed_range = old_snapshot.compare(&new_snapshot, &buffer);
2254 (new_snapshot, changed_range)
2255 })
2256 .await;
2257 this.update(cx, |this, cx| {
2258 if let Some(conflict_set) = &this.conflict_set {
2259 conflict_set
2260 .update(cx, |conflict_set, cx| {
2261 conflict_set.set_snapshot(snapshot, changed_range, cx);
2262 })
2263 .ok();
2264 }
2265 let futures = std::mem::take(&mut this.conflict_updated_futures);
2266 for tx in futures {
2267 tx.send(()).ok();
2268 }
2269 })
2270 }))
2271 }
2272
2273 rx
2274 }
2275
2276 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
2277 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
2278 }
2279
2280 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
2281 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
2282 }
2283
2284 fn handle_base_texts_updated(
2285 &mut self,
2286 buffer: text::BufferSnapshot,
2287 message: proto::UpdateDiffBases,
2288 cx: &mut Context<Self>,
2289 ) {
2290 use proto::update_diff_bases::Mode;
2291
2292 let Some(mode) = Mode::from_i32(message.mode) else {
2293 return;
2294 };
2295
2296 let diff_bases_change = match mode {
2297 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
2298 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
2299 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
2300 Mode::IndexAndHead => DiffBasesChange::SetEach {
2301 index: message.staged_text,
2302 head: message.committed_text,
2303 },
2304 };
2305
2306 self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
2307 }
2308
2309 pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
2310 if *self.recalculating_tx.borrow() {
2311 let mut rx = self.recalculating_tx.subscribe();
2312 return Some(async move {
2313 loop {
2314 let is_recalculating = rx.recv().await;
2315 if is_recalculating != Some(true) {
2316 break;
2317 }
2318 }
2319 });
2320 } else {
2321 None
2322 }
2323 }
2324
2325 fn diff_bases_changed(
2326 &mut self,
2327 buffer: text::BufferSnapshot,
2328 diff_bases_change: Option<DiffBasesChange>,
2329 cx: &mut Context<Self>,
2330 ) {
2331 match diff_bases_change {
2332 Some(DiffBasesChange::SetIndex(index)) => {
2333 self.index_text = index.map(|mut index| {
2334 text::LineEnding::normalize(&mut index);
2335 Arc::new(index)
2336 });
2337 self.index_changed = true;
2338 }
2339 Some(DiffBasesChange::SetHead(head)) => {
2340 self.head_text = head.map(|mut head| {
2341 text::LineEnding::normalize(&mut head);
2342 Arc::new(head)
2343 });
2344 self.head_changed = true;
2345 }
2346 Some(DiffBasesChange::SetBoth(text)) => {
2347 let text = text.map(|mut text| {
2348 text::LineEnding::normalize(&mut text);
2349 Arc::new(text)
2350 });
2351 self.head_text = text.clone();
2352 self.index_text = text;
2353 self.head_changed = true;
2354 self.index_changed = true;
2355 }
2356 Some(DiffBasesChange::SetEach { index, head }) => {
2357 self.index_text = index.map(|mut index| {
2358 text::LineEnding::normalize(&mut index);
2359 Arc::new(index)
2360 });
2361 self.index_changed = true;
2362 self.head_text = head.map(|mut head| {
2363 text::LineEnding::normalize(&mut head);
2364 Arc::new(head)
2365 });
2366 self.head_changed = true;
2367 }
2368 None => {}
2369 }
2370
2371 self.recalculate_diffs(buffer, cx)
2372 }
2373
2374 fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
2375 *self.recalculating_tx.borrow_mut() = true;
2376
2377 let language = self.language.clone();
2378 let language_registry = self.language_registry.clone();
2379 let unstaged_diff = self.unstaged_diff();
2380 let uncommitted_diff = self.uncommitted_diff();
2381 let head = self.head_text.clone();
2382 let index = self.index_text.clone();
2383 let index_changed = self.index_changed;
2384 let head_changed = self.head_changed;
2385 let language_changed = self.language_changed;
2386 let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_write;
2387 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
2388 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
2389 (None, None) => true,
2390 _ => false,
2391 };
2392 self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
2393 log::debug!(
2394 "start recalculating diffs for buffer {}",
2395 buffer.remote_id()
2396 );
2397
2398 let mut new_unstaged_diff = None;
2399 if let Some(unstaged_diff) = &unstaged_diff {
2400 new_unstaged_diff = Some(
2401 BufferDiff::update_diff(
2402 unstaged_diff.clone(),
2403 buffer.clone(),
2404 index,
2405 index_changed,
2406 language_changed,
2407 language.clone(),
2408 language_registry.clone(),
2409 cx,
2410 )
2411 .await?,
2412 );
2413 }
2414
2415 let mut new_uncommitted_diff = None;
2416 if let Some(uncommitted_diff) = &uncommitted_diff {
2417 new_uncommitted_diff = if index_matches_head {
2418 new_unstaged_diff.clone()
2419 } else {
2420 Some(
2421 BufferDiff::update_diff(
2422 uncommitted_diff.clone(),
2423 buffer.clone(),
2424 head,
2425 head_changed,
2426 language_changed,
2427 language.clone(),
2428 language_registry.clone(),
2429 cx,
2430 )
2431 .await?,
2432 )
2433 }
2434 }
2435
2436 let cancel = this.update(cx, |this, _| {
2437 // This checks whether all pending stage/unstage operations
2438 // have quiesced (i.e. both the corresponding write and the
2439 // read of that write have completed). If not, then we cancel
2440 // this recalculation attempt to avoid invalidating pending
2441 // state too quickly; another recalculation will come along
2442 // later and clear the pending state once the state of the index has settled.
2443 if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
2444 *this.recalculating_tx.borrow_mut() = false;
2445 true
2446 } else {
2447 false
2448 }
2449 })?;
2450 if cancel {
2451 log::debug!(
2452 concat!(
2453 "aborting recalculating diffs for buffer {}",
2454 "due to subsequent hunk operations",
2455 ),
2456 buffer.remote_id()
2457 );
2458 return Ok(());
2459 }
2460
2461 let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
2462 unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
2463 {
2464 unstaged_diff.update(cx, |diff, cx| {
2465 if language_changed {
2466 diff.language_changed(cx);
2467 }
2468 diff.set_snapshot(new_unstaged_diff, &buffer, cx)
2469 })?
2470 } else {
2471 None
2472 };
2473
2474 if let Some((uncommitted_diff, new_uncommitted_diff)) =
2475 uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
2476 {
2477 uncommitted_diff.update(cx, |diff, cx| {
2478 if language_changed {
2479 diff.language_changed(cx);
2480 }
2481 diff.set_snapshot_with_secondary(
2482 new_uncommitted_diff,
2483 &buffer,
2484 unstaged_changed_range,
2485 true,
2486 cx,
2487 );
2488 })?;
2489 }
2490
2491 log::debug!(
2492 "finished recalculating diffs for buffer {}",
2493 buffer.remote_id()
2494 );
2495
2496 if let Some(this) = this.upgrade() {
2497 this.update(cx, |this, _| {
2498 this.index_changed = false;
2499 this.head_changed = false;
2500 this.language_changed = false;
2501 *this.recalculating_tx.borrow_mut() = false;
2502 })?;
2503 }
2504
2505 Ok(())
2506 }));
2507 }
2508}
2509
2510fn make_remote_delegate(
2511 this: Entity<GitStore>,
2512 project_id: u64,
2513 repository_id: RepositoryId,
2514 askpass_id: u64,
2515 cx: &mut AsyncApp,
2516) -> AskPassDelegate {
2517 AskPassDelegate::new(cx, move |prompt, tx, cx| {
2518 this.update(cx, |this, cx| {
2519 let Some((client, _)) = this.downstream_client() else {
2520 return;
2521 };
2522 let response = client.request(proto::AskPassRequest {
2523 project_id,
2524 repository_id: repository_id.to_proto(),
2525 askpass_id,
2526 prompt,
2527 });
2528 cx.spawn(async move |_, _| {
2529 tx.send(response.await?.response).ok();
2530 anyhow::Ok(())
2531 })
2532 .detach_and_log_err(cx);
2533 })
2534 .log_err();
2535 })
2536}
2537
2538impl RepositoryId {
2539 pub fn to_proto(self) -> u64 {
2540 self.0
2541 }
2542
2543 pub fn from_proto(id: u64) -> Self {
2544 RepositoryId(id)
2545 }
2546}
2547
2548impl RepositorySnapshot {
2549 fn empty(id: RepositoryId, work_directory_abs_path: Arc<Path>) -> Self {
2550 Self {
2551 id,
2552 statuses_by_path: Default::default(),
2553 work_directory_abs_path,
2554 branch: None,
2555 head_commit: None,
2556 scan_id: 0,
2557 merge: Default::default(),
2558 }
2559 }
2560
2561 fn initial_update(&self, project_id: u64) -> proto::UpdateRepository {
2562 proto::UpdateRepository {
2563 branch_summary: self.branch.as_ref().map(branch_to_proto),
2564 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2565 updated_statuses: self
2566 .statuses_by_path
2567 .iter()
2568 .map(|entry| entry.to_proto())
2569 .collect(),
2570 removed_statuses: Default::default(),
2571 current_merge_conflicts: self
2572 .merge
2573 .conflicted_paths
2574 .iter()
2575 .map(|repo_path| repo_path.to_proto())
2576 .collect(),
2577 project_id,
2578 id: self.id.to_proto(),
2579 abs_path: self.work_directory_abs_path.to_proto(),
2580 entry_ids: vec![self.id.to_proto()],
2581 scan_id: self.scan_id,
2582 is_last_update: true,
2583 }
2584 }
2585
2586 fn build_update(&self, old: &Self, project_id: u64) -> proto::UpdateRepository {
2587 let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
2588 let mut removed_statuses: Vec<String> = Vec::new();
2589
2590 let mut new_statuses = self.statuses_by_path.iter().peekable();
2591 let mut old_statuses = old.statuses_by_path.iter().peekable();
2592
2593 let mut current_new_entry = new_statuses.next();
2594 let mut current_old_entry = old_statuses.next();
2595 loop {
2596 match (current_new_entry, current_old_entry) {
2597 (Some(new_entry), Some(old_entry)) => {
2598 match new_entry.repo_path.cmp(&old_entry.repo_path) {
2599 Ordering::Less => {
2600 updated_statuses.push(new_entry.to_proto());
2601 current_new_entry = new_statuses.next();
2602 }
2603 Ordering::Equal => {
2604 if new_entry.status != old_entry.status {
2605 updated_statuses.push(new_entry.to_proto());
2606 }
2607 current_old_entry = old_statuses.next();
2608 current_new_entry = new_statuses.next();
2609 }
2610 Ordering::Greater => {
2611 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2612 current_old_entry = old_statuses.next();
2613 }
2614 }
2615 }
2616 (None, Some(old_entry)) => {
2617 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2618 current_old_entry = old_statuses.next();
2619 }
2620 (Some(new_entry), None) => {
2621 updated_statuses.push(new_entry.to_proto());
2622 current_new_entry = new_statuses.next();
2623 }
2624 (None, None) => break,
2625 }
2626 }
2627
2628 proto::UpdateRepository {
2629 branch_summary: self.branch.as_ref().map(branch_to_proto),
2630 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2631 updated_statuses,
2632 removed_statuses,
2633 current_merge_conflicts: self
2634 .merge
2635 .conflicted_paths
2636 .iter()
2637 .map(|path| path.as_ref().to_proto())
2638 .collect(),
2639 project_id,
2640 id: self.id.to_proto(),
2641 abs_path: self.work_directory_abs_path.to_proto(),
2642 entry_ids: vec![],
2643 scan_id: self.scan_id,
2644 is_last_update: true,
2645 }
2646 }
2647
2648 pub fn status(&self) -> impl Iterator<Item = StatusEntry> + '_ {
2649 self.statuses_by_path.iter().cloned()
2650 }
2651
2652 pub fn status_summary(&self) -> GitSummary {
2653 self.statuses_by_path.summary().item_summary
2654 }
2655
2656 pub fn status_for_path(&self, path: &RepoPath) -> Option<StatusEntry> {
2657 self.statuses_by_path
2658 .get(&PathKey(path.0.clone()), &())
2659 .cloned()
2660 }
2661
2662 pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
2663 abs_path
2664 .strip_prefix(&self.work_directory_abs_path)
2665 .map(RepoPath::from)
2666 .ok()
2667 }
2668
2669 pub fn had_conflict_on_last_merge_head_change(&self, repo_path: &RepoPath) -> bool {
2670 self.merge.conflicted_paths.contains(&repo_path)
2671 }
2672
2673 pub fn has_conflict(&self, repo_path: &RepoPath) -> bool {
2674 let had_conflict_on_last_merge_head_change =
2675 self.merge.conflicted_paths.contains(&repo_path);
2676 let has_conflict_currently = self
2677 .status_for_path(&repo_path)
2678 .map_or(false, |entry| entry.status.is_conflicted());
2679 had_conflict_on_last_merge_head_change || has_conflict_currently
2680 }
2681
2682 /// This is the name that will be displayed in the repository selector for this repository.
2683 pub fn display_name(&self) -> SharedString {
2684 self.work_directory_abs_path
2685 .file_name()
2686 .unwrap_or_default()
2687 .to_string_lossy()
2688 .to_string()
2689 .into()
2690 }
2691}
2692
2693impl MergeDetails {
2694 async fn load(
2695 backend: &Arc<dyn GitRepository>,
2696 status: &SumTree<StatusEntry>,
2697 prev_snapshot: &RepositorySnapshot,
2698 ) -> Result<(MergeDetails, bool)> {
2699 log::debug!("load merge details");
2700 let message = backend.merge_message().await;
2701 let heads = backend
2702 .revparse_batch(vec![
2703 "MERGE_HEAD".into(),
2704 "CHERRY_PICK_HEAD".into(),
2705 "REBASE_HEAD".into(),
2706 "REVERT_HEAD".into(),
2707 "APPLY_HEAD".into(),
2708 ])
2709 .await
2710 .log_err()
2711 .unwrap_or_default()
2712 .into_iter()
2713 .map(|opt| opt.map(SharedString::from))
2714 .collect::<Vec<_>>();
2715 let merge_heads_changed = heads != prev_snapshot.merge.heads;
2716 let conflicted_paths = if merge_heads_changed {
2717 let current_conflicted_paths = TreeSet::from_ordered_entries(
2718 status
2719 .iter()
2720 .filter(|entry| entry.status.is_conflicted())
2721 .map(|entry| entry.repo_path.clone()),
2722 );
2723
2724 // It can happen that we run a scan while a lengthy merge is in progress
2725 // that will eventually result in conflicts, but before those conflicts
2726 // are reported by `git status`. Since for the moment we only care about
2727 // the merge heads state for the purposes of tracking conflicts, don't update
2728 // this state until we see some conflicts.
2729 if heads.iter().any(Option::is_some)
2730 && !prev_snapshot.merge.heads.iter().any(Option::is_some)
2731 && current_conflicted_paths.is_empty()
2732 {
2733 log::debug!("not updating merge heads because no conflicts found");
2734 return Ok((
2735 MergeDetails {
2736 message: message.map(SharedString::from),
2737 ..prev_snapshot.merge.clone()
2738 },
2739 false,
2740 ));
2741 }
2742
2743 current_conflicted_paths
2744 } else {
2745 prev_snapshot.merge.conflicted_paths.clone()
2746 };
2747 let details = MergeDetails {
2748 conflicted_paths,
2749 message: message.map(SharedString::from),
2750 heads,
2751 };
2752 Ok((details, merge_heads_changed))
2753 }
2754}
2755
2756impl Repository {
2757 pub fn snapshot(&self) -> RepositorySnapshot {
2758 self.snapshot.clone()
2759 }
2760
2761 fn local(
2762 id: RepositoryId,
2763 work_directory_abs_path: Arc<Path>,
2764 dot_git_abs_path: Arc<Path>,
2765 repository_dir_abs_path: Arc<Path>,
2766 common_dir_abs_path: Arc<Path>,
2767 project_environment: WeakEntity<ProjectEnvironment>,
2768 fs: Arc<dyn Fs>,
2769 git_store: WeakEntity<GitStore>,
2770 cx: &mut Context<Self>,
2771 ) -> Self {
2772 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path.clone());
2773 Repository {
2774 this: cx.weak_entity(),
2775 git_store,
2776 snapshot,
2777 commit_message_buffer: None,
2778 askpass_delegates: Default::default(),
2779 paths_needing_status_update: Default::default(),
2780 latest_askpass_id: 0,
2781 job_sender: Repository::spawn_local_git_worker(
2782 work_directory_abs_path,
2783 dot_git_abs_path,
2784 repository_dir_abs_path,
2785 common_dir_abs_path,
2786 project_environment,
2787 fs,
2788 cx,
2789 ),
2790 job_id: 0,
2791 active_jobs: Default::default(),
2792 }
2793 }
2794
2795 fn remote(
2796 id: RepositoryId,
2797 work_directory_abs_path: Arc<Path>,
2798 project_id: ProjectId,
2799 client: AnyProtoClient,
2800 git_store: WeakEntity<GitStore>,
2801 cx: &mut Context<Self>,
2802 ) -> Self {
2803 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path);
2804 Self {
2805 this: cx.weak_entity(),
2806 snapshot,
2807 commit_message_buffer: None,
2808 git_store,
2809 paths_needing_status_update: Default::default(),
2810 job_sender: Self::spawn_remote_git_worker(project_id, client, cx),
2811 askpass_delegates: Default::default(),
2812 latest_askpass_id: 0,
2813 active_jobs: Default::default(),
2814 job_id: 0,
2815 }
2816 }
2817
2818 pub fn git_store(&self) -> Option<Entity<GitStore>> {
2819 self.git_store.upgrade()
2820 }
2821
2822 fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
2823 let this = cx.weak_entity();
2824 let git_store = self.git_store.clone();
2825 let _ = self.send_keyed_job(
2826 Some(GitJobKey::ReloadBufferDiffBases),
2827 None,
2828 |state, mut cx| async move {
2829 let RepositoryState::Local { backend, .. } = state else {
2830 log::error!("tried to recompute diffs for a non-local repository");
2831 return Ok(());
2832 };
2833
2834 let Some(this) = this.upgrade() else {
2835 return Ok(());
2836 };
2837
2838 let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
2839 git_store.update(cx, |git_store, cx| {
2840 git_store
2841 .diffs
2842 .iter()
2843 .filter_map(|(buffer_id, diff_state)| {
2844 let buffer_store = git_store.buffer_store.read(cx);
2845 let buffer = buffer_store.get(*buffer_id)?;
2846 let file = File::from_dyn(buffer.read(cx).file())?;
2847 let abs_path =
2848 file.worktree.read(cx).absolutize(&file.path).ok()?;
2849 let repo_path = this.abs_path_to_repo_path(&abs_path)?;
2850 log::debug!(
2851 "start reload diff bases for repo path {}",
2852 repo_path.0.display()
2853 );
2854 diff_state.update(cx, |diff_state, _| {
2855 let has_unstaged_diff = diff_state
2856 .unstaged_diff
2857 .as_ref()
2858 .is_some_and(|diff| diff.is_upgradable());
2859 let has_uncommitted_diff = diff_state
2860 .uncommitted_diff
2861 .as_ref()
2862 .is_some_and(|set| set.is_upgradable());
2863
2864 Some((
2865 buffer,
2866 repo_path,
2867 has_unstaged_diff.then(|| diff_state.index_text.clone()),
2868 has_uncommitted_diff.then(|| diff_state.head_text.clone()),
2869 ))
2870 })
2871 })
2872 .collect::<Vec<_>>()
2873 })
2874 })??;
2875
2876 let buffer_diff_base_changes = cx
2877 .background_spawn(async move {
2878 let mut changes = Vec::new();
2879 for (buffer, repo_path, current_index_text, current_head_text) in
2880 &repo_diff_state_updates
2881 {
2882 let index_text = if current_index_text.is_some() {
2883 backend.load_index_text(repo_path.clone()).await
2884 } else {
2885 None
2886 };
2887 let head_text = if current_head_text.is_some() {
2888 backend.load_committed_text(repo_path.clone()).await
2889 } else {
2890 None
2891 };
2892
2893 let change =
2894 match (current_index_text.as_ref(), current_head_text.as_ref()) {
2895 (Some(current_index), Some(current_head)) => {
2896 let index_changed =
2897 index_text.as_ref() != current_index.as_deref();
2898 let head_changed =
2899 head_text.as_ref() != current_head.as_deref();
2900 if index_changed && head_changed {
2901 if index_text == head_text {
2902 Some(DiffBasesChange::SetBoth(head_text))
2903 } else {
2904 Some(DiffBasesChange::SetEach {
2905 index: index_text,
2906 head: head_text,
2907 })
2908 }
2909 } else if index_changed {
2910 Some(DiffBasesChange::SetIndex(index_text))
2911 } else if head_changed {
2912 Some(DiffBasesChange::SetHead(head_text))
2913 } else {
2914 None
2915 }
2916 }
2917 (Some(current_index), None) => {
2918 let index_changed =
2919 index_text.as_ref() != current_index.as_deref();
2920 index_changed
2921 .then_some(DiffBasesChange::SetIndex(index_text))
2922 }
2923 (None, Some(current_head)) => {
2924 let head_changed =
2925 head_text.as_ref() != current_head.as_deref();
2926 head_changed.then_some(DiffBasesChange::SetHead(head_text))
2927 }
2928 (None, None) => None,
2929 };
2930
2931 changes.push((buffer.clone(), change))
2932 }
2933 changes
2934 })
2935 .await;
2936
2937 git_store.update(&mut cx, |git_store, cx| {
2938 for (buffer, diff_bases_change) in buffer_diff_base_changes {
2939 let buffer_snapshot = buffer.read(cx).text_snapshot();
2940 let buffer_id = buffer_snapshot.remote_id();
2941 let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
2942 continue;
2943 };
2944
2945 let downstream_client = git_store.downstream_client();
2946 diff_state.update(cx, |diff_state, cx| {
2947 use proto::update_diff_bases::Mode;
2948
2949 if let Some((diff_bases_change, (client, project_id))) =
2950 diff_bases_change.clone().zip(downstream_client)
2951 {
2952 let (staged_text, committed_text, mode) = match diff_bases_change {
2953 DiffBasesChange::SetIndex(index) => {
2954 (index, None, Mode::IndexOnly)
2955 }
2956 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
2957 DiffBasesChange::SetEach { index, head } => {
2958 (index, head, Mode::IndexAndHead)
2959 }
2960 DiffBasesChange::SetBoth(text) => {
2961 (None, text, Mode::IndexMatchesHead)
2962 }
2963 };
2964 client
2965 .send(proto::UpdateDiffBases {
2966 project_id: project_id.to_proto(),
2967 buffer_id: buffer_id.to_proto(),
2968 staged_text,
2969 committed_text,
2970 mode: mode as i32,
2971 })
2972 .log_err();
2973 }
2974
2975 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
2976 });
2977 }
2978 })
2979 },
2980 );
2981 }
2982
2983 pub fn send_job<F, Fut, R>(
2984 &mut self,
2985 status: Option<SharedString>,
2986 job: F,
2987 ) -> oneshot::Receiver<R>
2988 where
2989 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
2990 Fut: Future<Output = R> + 'static,
2991 R: Send + 'static,
2992 {
2993 self.send_keyed_job(None, status, job)
2994 }
2995
2996 fn send_keyed_job<F, Fut, R>(
2997 &mut self,
2998 key: Option<GitJobKey>,
2999 status: Option<SharedString>,
3000 job: F,
3001 ) -> oneshot::Receiver<R>
3002 where
3003 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3004 Fut: Future<Output = R> + 'static,
3005 R: Send + 'static,
3006 {
3007 let (result_tx, result_rx) = futures::channel::oneshot::channel();
3008 let job_id = post_inc(&mut self.job_id);
3009 let this = self.this.clone();
3010 self.job_sender
3011 .unbounded_send(GitJob {
3012 key,
3013 job: Box::new(move |state, cx: &mut AsyncApp| {
3014 let job = job(state, cx.clone());
3015 cx.spawn(async move |cx| {
3016 if let Some(s) = status.clone() {
3017 this.update(cx, |this, cx| {
3018 this.active_jobs.insert(
3019 job_id,
3020 JobInfo {
3021 start: Instant::now(),
3022 message: s.clone(),
3023 },
3024 );
3025
3026 cx.notify();
3027 })
3028 .ok();
3029 }
3030 let result = job.await;
3031
3032 this.update(cx, |this, cx| {
3033 this.active_jobs.remove(&job_id);
3034 cx.notify();
3035 })
3036 .ok();
3037
3038 result_tx.send(result).ok();
3039 })
3040 }),
3041 })
3042 .ok();
3043 result_rx
3044 }
3045
3046 pub fn set_as_active_repository(&self, cx: &mut Context<Self>) {
3047 let Some(git_store) = self.git_store.upgrade() else {
3048 return;
3049 };
3050 let entity = cx.entity();
3051 git_store.update(cx, |git_store, cx| {
3052 let Some((&id, _)) = git_store
3053 .repositories
3054 .iter()
3055 .find(|(_, handle)| *handle == &entity)
3056 else {
3057 return;
3058 };
3059 git_store.active_repo_id = Some(id);
3060 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
3061 });
3062 }
3063
3064 pub fn cached_status(&self) -> impl '_ + Iterator<Item = StatusEntry> {
3065 self.snapshot.status()
3066 }
3067
3068 pub fn repo_path_to_project_path(&self, path: &RepoPath, cx: &App) -> Option<ProjectPath> {
3069 let git_store = self.git_store.upgrade()?;
3070 let worktree_store = git_store.read(cx).worktree_store.read(cx);
3071 let abs_path = self.snapshot.work_directory_abs_path.join(&path.0);
3072 let (worktree, relative_path) = worktree_store.find_worktree(abs_path, cx)?;
3073 Some(ProjectPath {
3074 worktree_id: worktree.read(cx).id(),
3075 path: relative_path.into(),
3076 })
3077 }
3078
3079 pub fn project_path_to_repo_path(&self, path: &ProjectPath, cx: &App) -> Option<RepoPath> {
3080 let git_store = self.git_store.upgrade()?;
3081 let worktree_store = git_store.read(cx).worktree_store.read(cx);
3082 let abs_path = worktree_store.absolutize(path, cx)?;
3083 self.snapshot.abs_path_to_repo_path(&abs_path)
3084 }
3085
3086 pub fn contains_sub_repo(&self, other: &Entity<Self>, cx: &App) -> bool {
3087 other
3088 .read(cx)
3089 .snapshot
3090 .work_directory_abs_path
3091 .starts_with(&self.snapshot.work_directory_abs_path)
3092 }
3093
3094 pub fn open_commit_buffer(
3095 &mut self,
3096 languages: Option<Arc<LanguageRegistry>>,
3097 buffer_store: Entity<BufferStore>,
3098 cx: &mut Context<Self>,
3099 ) -> Task<Result<Entity<Buffer>>> {
3100 let id = self.id;
3101 if let Some(buffer) = self.commit_message_buffer.clone() {
3102 return Task::ready(Ok(buffer));
3103 }
3104 let this = cx.weak_entity();
3105
3106 let rx = self.send_job(None, move |state, mut cx| async move {
3107 let Some(this) = this.upgrade() else {
3108 bail!("git store was dropped");
3109 };
3110 match state {
3111 RepositoryState::Local { .. } => {
3112 this.update(&mut cx, |_, cx| {
3113 Self::open_local_commit_buffer(languages, buffer_store, cx)
3114 })?
3115 .await
3116 }
3117 RepositoryState::Remote { project_id, client } => {
3118 let request = client.request(proto::OpenCommitMessageBuffer {
3119 project_id: project_id.0,
3120 repository_id: id.to_proto(),
3121 });
3122 let response = request.await.context("requesting to open commit buffer")?;
3123 let buffer_id = BufferId::new(response.buffer_id)?;
3124 let buffer = buffer_store
3125 .update(&mut cx, |buffer_store, cx| {
3126 buffer_store.wait_for_remote_buffer(buffer_id, cx)
3127 })?
3128 .await?;
3129 if let Some(language_registry) = languages {
3130 let git_commit_language =
3131 language_registry.language_for_name("Git Commit").await?;
3132 buffer.update(&mut cx, |buffer, cx| {
3133 buffer.set_language(Some(git_commit_language), cx);
3134 })?;
3135 }
3136 this.update(&mut cx, |this, _| {
3137 this.commit_message_buffer = Some(buffer.clone());
3138 })?;
3139 Ok(buffer)
3140 }
3141 }
3142 });
3143
3144 cx.spawn(|_, _: &mut AsyncApp| async move { rx.await? })
3145 }
3146
3147 fn open_local_commit_buffer(
3148 language_registry: Option<Arc<LanguageRegistry>>,
3149 buffer_store: Entity<BufferStore>,
3150 cx: &mut Context<Self>,
3151 ) -> Task<Result<Entity<Buffer>>> {
3152 cx.spawn(async move |repository, cx| {
3153 let buffer = buffer_store
3154 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx))?
3155 .await?;
3156
3157 if let Some(language_registry) = language_registry {
3158 let git_commit_language = language_registry.language_for_name("Git Commit").await?;
3159 buffer.update(cx, |buffer, cx| {
3160 buffer.set_language(Some(git_commit_language), cx);
3161 })?;
3162 }
3163
3164 repository.update(cx, |repository, _| {
3165 repository.commit_message_buffer = Some(buffer.clone());
3166 })?;
3167 Ok(buffer)
3168 })
3169 }
3170
3171 pub fn checkout_files(
3172 &mut self,
3173 commit: &str,
3174 paths: Vec<RepoPath>,
3175 _cx: &mut App,
3176 ) -> oneshot::Receiver<Result<()>> {
3177 let commit = commit.to_string();
3178 let id = self.id;
3179
3180 self.send_job(
3181 Some(format!("git checkout {}", commit).into()),
3182 move |git_repo, _| async move {
3183 match git_repo {
3184 RepositoryState::Local {
3185 backend,
3186 environment,
3187 ..
3188 } => {
3189 backend
3190 .checkout_files(commit, paths, environment.clone())
3191 .await
3192 }
3193 RepositoryState::Remote { project_id, client } => {
3194 client
3195 .request(proto::GitCheckoutFiles {
3196 project_id: project_id.0,
3197 repository_id: id.to_proto(),
3198 commit,
3199 paths: paths
3200 .into_iter()
3201 .map(|p| p.to_string_lossy().to_string())
3202 .collect(),
3203 })
3204 .await?;
3205
3206 Ok(())
3207 }
3208 }
3209 },
3210 )
3211 }
3212
3213 pub fn reset(
3214 &mut self,
3215 commit: String,
3216 reset_mode: ResetMode,
3217 _cx: &mut App,
3218 ) -> oneshot::Receiver<Result<()>> {
3219 let commit = commit.to_string();
3220 let id = self.id;
3221
3222 self.send_job(None, move |git_repo, _| async move {
3223 match git_repo {
3224 RepositoryState::Local {
3225 backend,
3226 environment,
3227 ..
3228 } => backend.reset(commit, reset_mode, environment).await,
3229 RepositoryState::Remote { project_id, client } => {
3230 client
3231 .request(proto::GitReset {
3232 project_id: project_id.0,
3233 repository_id: id.to_proto(),
3234 commit,
3235 mode: match reset_mode {
3236 ResetMode::Soft => git_reset::ResetMode::Soft.into(),
3237 ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
3238 },
3239 })
3240 .await?;
3241
3242 Ok(())
3243 }
3244 }
3245 })
3246 }
3247
3248 pub fn show(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDetails>> {
3249 let id = self.id;
3250 self.send_job(None, move |git_repo, _cx| async move {
3251 match git_repo {
3252 RepositoryState::Local { backend, .. } => backend.show(commit).await,
3253 RepositoryState::Remote { project_id, client } => {
3254 let resp = client
3255 .request(proto::GitShow {
3256 project_id: project_id.0,
3257 repository_id: id.to_proto(),
3258 commit,
3259 })
3260 .await?;
3261
3262 Ok(CommitDetails {
3263 sha: resp.sha.into(),
3264 message: resp.message.into(),
3265 commit_timestamp: resp.commit_timestamp,
3266 author_email: resp.author_email.into(),
3267 author_name: resp.author_name.into(),
3268 })
3269 }
3270 }
3271 })
3272 }
3273
3274 pub fn load_commit_diff(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
3275 let id = self.id;
3276 self.send_job(None, move |git_repo, cx| async move {
3277 match git_repo {
3278 RepositoryState::Local { backend, .. } => backend.load_commit(commit, cx).await,
3279 RepositoryState::Remote {
3280 client, project_id, ..
3281 } => {
3282 let response = client
3283 .request(proto::LoadCommitDiff {
3284 project_id: project_id.0,
3285 repository_id: id.to_proto(),
3286 commit,
3287 })
3288 .await?;
3289 Ok(CommitDiff {
3290 files: response
3291 .files
3292 .into_iter()
3293 .map(|file| CommitFile {
3294 path: Path::new(&file.path).into(),
3295 old_text: file.old_text,
3296 new_text: file.new_text,
3297 })
3298 .collect(),
3299 })
3300 }
3301 }
3302 })
3303 }
3304
3305 fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
3306 Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
3307 }
3308
3309 pub fn stage_entries(
3310 &self,
3311 entries: Vec<RepoPath>,
3312 cx: &mut Context<Self>,
3313 ) -> Task<anyhow::Result<()>> {
3314 if entries.is_empty() {
3315 return Task::ready(Ok(()));
3316 }
3317 let id = self.id;
3318
3319 let mut save_futures = Vec::new();
3320 if let Some(buffer_store) = self.buffer_store(cx) {
3321 buffer_store.update(cx, |buffer_store, cx| {
3322 for path in &entries {
3323 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3324 continue;
3325 };
3326 if let Some(buffer) = buffer_store.get_by_path(&project_path) {
3327 if buffer
3328 .read(cx)
3329 .file()
3330 .map_or(false, |file| file.disk_state().exists())
3331 {
3332 save_futures.push(buffer_store.save_buffer(buffer, cx));
3333 }
3334 }
3335 }
3336 })
3337 }
3338
3339 cx.spawn(async move |this, cx| {
3340 for save_future in save_futures {
3341 save_future.await?;
3342 }
3343
3344 this.update(cx, |this, _| {
3345 this.send_job(None, move |git_repo, _cx| async move {
3346 match git_repo {
3347 RepositoryState::Local {
3348 backend,
3349 environment,
3350 ..
3351 } => backend.stage_paths(entries, environment.clone()).await,
3352 RepositoryState::Remote { project_id, client } => {
3353 client
3354 .request(proto::Stage {
3355 project_id: project_id.0,
3356 repository_id: id.to_proto(),
3357 paths: entries
3358 .into_iter()
3359 .map(|repo_path| repo_path.as_ref().to_proto())
3360 .collect(),
3361 })
3362 .await
3363 .context("sending stage request")?;
3364
3365 Ok(())
3366 }
3367 }
3368 })
3369 })?
3370 .await??;
3371
3372 Ok(())
3373 })
3374 }
3375
3376 pub fn unstage_entries(
3377 &self,
3378 entries: Vec<RepoPath>,
3379 cx: &mut Context<Self>,
3380 ) -> Task<anyhow::Result<()>> {
3381 if entries.is_empty() {
3382 return Task::ready(Ok(()));
3383 }
3384 let id = self.id;
3385
3386 let mut save_futures = Vec::new();
3387 if let Some(buffer_store) = self.buffer_store(cx) {
3388 buffer_store.update(cx, |buffer_store, cx| {
3389 for path in &entries {
3390 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3391 continue;
3392 };
3393 if let Some(buffer) = buffer_store.get_by_path(&project_path) {
3394 if buffer
3395 .read(cx)
3396 .file()
3397 .map_or(false, |file| file.disk_state().exists())
3398 {
3399 save_futures.push(buffer_store.save_buffer(buffer, cx));
3400 }
3401 }
3402 }
3403 })
3404 }
3405
3406 cx.spawn(async move |this, cx| {
3407 for save_future in save_futures {
3408 save_future.await?;
3409 }
3410
3411 this.update(cx, |this, _| {
3412 this.send_job(None, move |git_repo, _cx| async move {
3413 match git_repo {
3414 RepositoryState::Local {
3415 backend,
3416 environment,
3417 ..
3418 } => backend.unstage_paths(entries, environment).await,
3419 RepositoryState::Remote { project_id, client } => {
3420 client
3421 .request(proto::Unstage {
3422 project_id: project_id.0,
3423 repository_id: id.to_proto(),
3424 paths: entries
3425 .into_iter()
3426 .map(|repo_path| repo_path.as_ref().to_proto())
3427 .collect(),
3428 })
3429 .await
3430 .context("sending unstage request")?;
3431
3432 Ok(())
3433 }
3434 }
3435 })
3436 })?
3437 .await??;
3438
3439 Ok(())
3440 })
3441 }
3442
3443 pub fn stage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3444 let to_stage = self
3445 .cached_status()
3446 .filter(|entry| !entry.status.staging().is_fully_staged())
3447 .map(|entry| entry.repo_path.clone())
3448 .collect();
3449 self.stage_entries(to_stage, cx)
3450 }
3451
3452 pub fn unstage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3453 let to_unstage = self
3454 .cached_status()
3455 .filter(|entry| entry.status.staging().has_staged())
3456 .map(|entry| entry.repo_path.clone())
3457 .collect();
3458 self.unstage_entries(to_unstage, cx)
3459 }
3460
3461 pub fn commit(
3462 &mut self,
3463 message: SharedString,
3464 name_and_email: Option<(SharedString, SharedString)>,
3465 options: CommitOptions,
3466 _cx: &mut App,
3467 ) -> oneshot::Receiver<Result<()>> {
3468 let id = self.id;
3469
3470 self.send_job(Some("git commit".into()), move |git_repo, _cx| async move {
3471 match git_repo {
3472 RepositoryState::Local {
3473 backend,
3474 environment,
3475 ..
3476 } => {
3477 backend
3478 .commit(message, name_and_email, options, environment)
3479 .await
3480 }
3481 RepositoryState::Remote { project_id, client } => {
3482 let (name, email) = name_and_email.unzip();
3483 client
3484 .request(proto::Commit {
3485 project_id: project_id.0,
3486 repository_id: id.to_proto(),
3487 message: String::from(message),
3488 name: name.map(String::from),
3489 email: email.map(String::from),
3490 options: Some(proto::commit::CommitOptions {
3491 amend: options.amend,
3492 signoff: options.signoff,
3493 }),
3494 })
3495 .await
3496 .context("sending commit request")?;
3497
3498 Ok(())
3499 }
3500 }
3501 })
3502 }
3503
3504 pub fn fetch(
3505 &mut self,
3506 fetch_options: FetchOptions,
3507 askpass: AskPassDelegate,
3508 _cx: &mut App,
3509 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3510 let askpass_delegates = self.askpass_delegates.clone();
3511 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3512 let id = self.id;
3513
3514 self.send_job(Some("git fetch".into()), move |git_repo, cx| async move {
3515 match git_repo {
3516 RepositoryState::Local {
3517 backend,
3518 environment,
3519 ..
3520 } => backend.fetch(fetch_options, askpass, environment, cx).await,
3521 RepositoryState::Remote { project_id, client } => {
3522 askpass_delegates.lock().insert(askpass_id, askpass);
3523 let _defer = util::defer(|| {
3524 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3525 debug_assert!(askpass_delegate.is_some());
3526 });
3527
3528 let response = client
3529 .request(proto::Fetch {
3530 project_id: project_id.0,
3531 repository_id: id.to_proto(),
3532 askpass_id,
3533 remote: fetch_options.to_proto(),
3534 })
3535 .await
3536 .context("sending fetch request")?;
3537
3538 Ok(RemoteCommandOutput {
3539 stdout: response.stdout,
3540 stderr: response.stderr,
3541 })
3542 }
3543 }
3544 })
3545 }
3546
3547 pub fn push(
3548 &mut self,
3549 branch: SharedString,
3550 remote: SharedString,
3551 options: Option<PushOptions>,
3552 askpass: AskPassDelegate,
3553 cx: &mut Context<Self>,
3554 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3555 let askpass_delegates = self.askpass_delegates.clone();
3556 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3557 let id = self.id;
3558
3559 let args = options
3560 .map(|option| match option {
3561 PushOptions::SetUpstream => " --set-upstream",
3562 PushOptions::Force => " --force-with-lease",
3563 })
3564 .unwrap_or("");
3565
3566 let updates_tx = self
3567 .git_store()
3568 .and_then(|git_store| match &git_store.read(cx).state {
3569 GitStoreState::Local { downstream, .. } => downstream
3570 .as_ref()
3571 .map(|downstream| downstream.updates_tx.clone()),
3572 _ => None,
3573 });
3574
3575 let this = cx.weak_entity();
3576 self.send_job(
3577 Some(format!("git push {} {} {}", args, branch, remote).into()),
3578 move |git_repo, mut cx| async move {
3579 match git_repo {
3580 RepositoryState::Local {
3581 backend,
3582 environment,
3583 ..
3584 } => {
3585 let result = backend
3586 .push(
3587 branch.to_string(),
3588 remote.to_string(),
3589 options,
3590 askpass,
3591 environment.clone(),
3592 cx.clone(),
3593 )
3594 .await;
3595 if result.is_ok() {
3596 let branches = backend.branches().await?;
3597 let branch = branches.into_iter().find(|branch| branch.is_head);
3598 log::info!("head branch after scan is {branch:?}");
3599 let snapshot = this.update(&mut cx, |this, cx| {
3600 this.snapshot.branch = branch;
3601 let snapshot = this.snapshot.clone();
3602 cx.emit(RepositoryEvent::Updated {
3603 full_scan: false,
3604 new_instance: false,
3605 });
3606 snapshot
3607 })?;
3608 if let Some(updates_tx) = updates_tx {
3609 updates_tx
3610 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
3611 .ok();
3612 }
3613 }
3614 result
3615 }
3616 RepositoryState::Remote { project_id, client } => {
3617 askpass_delegates.lock().insert(askpass_id, askpass);
3618 let _defer = util::defer(|| {
3619 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3620 debug_assert!(askpass_delegate.is_some());
3621 });
3622 let response = client
3623 .request(proto::Push {
3624 project_id: project_id.0,
3625 repository_id: id.to_proto(),
3626 askpass_id,
3627 branch_name: branch.to_string(),
3628 remote_name: remote.to_string(),
3629 options: options.map(|options| match options {
3630 PushOptions::Force => proto::push::PushOptions::Force,
3631 PushOptions::SetUpstream => {
3632 proto::push::PushOptions::SetUpstream
3633 }
3634 }
3635 as i32),
3636 })
3637 .await
3638 .context("sending push request")?;
3639
3640 Ok(RemoteCommandOutput {
3641 stdout: response.stdout,
3642 stderr: response.stderr,
3643 })
3644 }
3645 }
3646 },
3647 )
3648 }
3649
3650 pub fn pull(
3651 &mut self,
3652 branch: SharedString,
3653 remote: SharedString,
3654 askpass: AskPassDelegate,
3655 _cx: &mut App,
3656 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3657 let askpass_delegates = self.askpass_delegates.clone();
3658 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3659 let id = self.id;
3660
3661 self.send_job(
3662 Some(format!("git pull {} {}", remote, branch).into()),
3663 move |git_repo, cx| async move {
3664 match git_repo {
3665 RepositoryState::Local {
3666 backend,
3667 environment,
3668 ..
3669 } => {
3670 backend
3671 .pull(
3672 branch.to_string(),
3673 remote.to_string(),
3674 askpass,
3675 environment.clone(),
3676 cx,
3677 )
3678 .await
3679 }
3680 RepositoryState::Remote { project_id, client } => {
3681 askpass_delegates.lock().insert(askpass_id, askpass);
3682 let _defer = util::defer(|| {
3683 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3684 debug_assert!(askpass_delegate.is_some());
3685 });
3686 let response = client
3687 .request(proto::Pull {
3688 project_id: project_id.0,
3689 repository_id: id.to_proto(),
3690 askpass_id,
3691 branch_name: branch.to_string(),
3692 remote_name: remote.to_string(),
3693 })
3694 .await
3695 .context("sending pull request")?;
3696
3697 Ok(RemoteCommandOutput {
3698 stdout: response.stdout,
3699 stderr: response.stderr,
3700 })
3701 }
3702 }
3703 },
3704 )
3705 }
3706
3707 fn spawn_set_index_text_job(
3708 &mut self,
3709 path: RepoPath,
3710 content: Option<String>,
3711 hunk_staging_operation_count: Option<usize>,
3712 cx: &mut Context<Self>,
3713 ) -> oneshot::Receiver<anyhow::Result<()>> {
3714 let id = self.id;
3715 let this = cx.weak_entity();
3716 let git_store = self.git_store.clone();
3717 self.send_keyed_job(
3718 Some(GitJobKey::WriteIndex(path.clone())),
3719 None,
3720 move |git_repo, mut cx| async move {
3721 log::debug!("start updating index text for buffer {}", path.display());
3722 match git_repo {
3723 RepositoryState::Local {
3724 backend,
3725 environment,
3726 ..
3727 } => {
3728 backend
3729 .set_index_text(path.clone(), content, environment.clone())
3730 .await?;
3731 }
3732 RepositoryState::Remote { project_id, client } => {
3733 client
3734 .request(proto::SetIndexText {
3735 project_id: project_id.0,
3736 repository_id: id.to_proto(),
3737 path: path.as_ref().to_proto(),
3738 text: content,
3739 })
3740 .await?;
3741 }
3742 }
3743 log::debug!("finish updating index text for buffer {}", path.display());
3744
3745 if let Some(hunk_staging_operation_count) = hunk_staging_operation_count {
3746 let project_path = this
3747 .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
3748 .ok()
3749 .flatten();
3750 git_store.update(&mut cx, |git_store, cx| {
3751 let buffer_id = git_store
3752 .buffer_store
3753 .read(cx)
3754 .get_by_path(&project_path?)?
3755 .read(cx)
3756 .remote_id();
3757 let diff_state = git_store.diffs.get(&buffer_id)?;
3758 diff_state.update(cx, |diff_state, _| {
3759 diff_state.hunk_staging_operation_count_as_of_write =
3760 hunk_staging_operation_count;
3761 });
3762 Some(())
3763 })?;
3764 }
3765 Ok(())
3766 },
3767 )
3768 }
3769
3770 pub fn get_remotes(
3771 &mut self,
3772 branch_name: Option<String>,
3773 ) -> oneshot::Receiver<Result<Vec<Remote>>> {
3774 let id = self.id;
3775 self.send_job(None, move |repo, _cx| async move {
3776 match repo {
3777 RepositoryState::Local { backend, .. } => backend.get_remotes(branch_name).await,
3778 RepositoryState::Remote { project_id, client } => {
3779 let response = client
3780 .request(proto::GetRemotes {
3781 project_id: project_id.0,
3782 repository_id: id.to_proto(),
3783 branch_name,
3784 })
3785 .await?;
3786
3787 let remotes = response
3788 .remotes
3789 .into_iter()
3790 .map(|remotes| git::repository::Remote {
3791 name: remotes.name.into(),
3792 })
3793 .collect();
3794
3795 Ok(remotes)
3796 }
3797 }
3798 })
3799 }
3800
3801 pub fn branches(&mut self) -> oneshot::Receiver<Result<Vec<Branch>>> {
3802 let id = self.id;
3803 self.send_job(None, move |repo, _| async move {
3804 match repo {
3805 RepositoryState::Local { backend, .. } => backend.branches().await,
3806 RepositoryState::Remote { project_id, client } => {
3807 let response = client
3808 .request(proto::GitGetBranches {
3809 project_id: project_id.0,
3810 repository_id: id.to_proto(),
3811 })
3812 .await?;
3813
3814 let branches = response
3815 .branches
3816 .into_iter()
3817 .map(|branch| proto_to_branch(&branch))
3818 .collect();
3819
3820 Ok(branches)
3821 }
3822 }
3823 })
3824 }
3825
3826 pub fn diff(&mut self, diff_type: DiffType, _cx: &App) -> oneshot::Receiver<Result<String>> {
3827 let id = self.id;
3828 self.send_job(None, move |repo, _cx| async move {
3829 match repo {
3830 RepositoryState::Local { backend, .. } => backend.diff(diff_type).await,
3831 RepositoryState::Remote { project_id, client } => {
3832 let response = client
3833 .request(proto::GitDiff {
3834 project_id: project_id.0,
3835 repository_id: id.to_proto(),
3836 diff_type: match diff_type {
3837 DiffType::HeadToIndex => {
3838 proto::git_diff::DiffType::HeadToIndex.into()
3839 }
3840 DiffType::HeadToWorktree => {
3841 proto::git_diff::DiffType::HeadToWorktree.into()
3842 }
3843 },
3844 })
3845 .await?;
3846
3847 Ok(response.diff)
3848 }
3849 }
3850 })
3851 }
3852
3853 pub fn create_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3854 let id = self.id;
3855 self.send_job(
3856 Some(format!("git switch -c {branch_name}").into()),
3857 move |repo, _cx| async move {
3858 match repo {
3859 RepositoryState::Local { backend, .. } => {
3860 backend.create_branch(branch_name).await
3861 }
3862 RepositoryState::Remote { project_id, client } => {
3863 client
3864 .request(proto::GitCreateBranch {
3865 project_id: project_id.0,
3866 repository_id: id.to_proto(),
3867 branch_name,
3868 })
3869 .await?;
3870
3871 Ok(())
3872 }
3873 }
3874 },
3875 )
3876 }
3877
3878 pub fn change_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3879 let id = self.id;
3880 self.send_job(
3881 Some(format!("git switch {branch_name}").into()),
3882 move |repo, _cx| async move {
3883 match repo {
3884 RepositoryState::Local { backend, .. } => {
3885 backend.change_branch(branch_name).await
3886 }
3887 RepositoryState::Remote { project_id, client } => {
3888 client
3889 .request(proto::GitChangeBranch {
3890 project_id: project_id.0,
3891 repository_id: id.to_proto(),
3892 branch_name,
3893 })
3894 .await?;
3895
3896 Ok(())
3897 }
3898 }
3899 },
3900 )
3901 }
3902
3903 pub fn check_for_pushed_commits(&mut self) -> oneshot::Receiver<Result<Vec<SharedString>>> {
3904 let id = self.id;
3905 self.send_job(None, move |repo, _cx| async move {
3906 match repo {
3907 RepositoryState::Local { backend, .. } => backend.check_for_pushed_commit().await,
3908 RepositoryState::Remote { project_id, client } => {
3909 let response = client
3910 .request(proto::CheckForPushedCommits {
3911 project_id: project_id.0,
3912 repository_id: id.to_proto(),
3913 })
3914 .await?;
3915
3916 let branches = response.pushed_to.into_iter().map(Into::into).collect();
3917
3918 Ok(branches)
3919 }
3920 }
3921 })
3922 }
3923
3924 pub fn checkpoint(&mut self) -> oneshot::Receiver<Result<GitRepositoryCheckpoint>> {
3925 self.send_job(None, |repo, _cx| async move {
3926 match repo {
3927 RepositoryState::Local { backend, .. } => backend.checkpoint().await,
3928 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
3929 }
3930 })
3931 }
3932
3933 pub fn restore_checkpoint(
3934 &mut self,
3935 checkpoint: GitRepositoryCheckpoint,
3936 ) -> oneshot::Receiver<Result<()>> {
3937 self.send_job(None, move |repo, _cx| async move {
3938 match repo {
3939 RepositoryState::Local { backend, .. } => {
3940 backend.restore_checkpoint(checkpoint).await
3941 }
3942 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
3943 }
3944 })
3945 }
3946
3947 pub(crate) fn apply_remote_update(
3948 &mut self,
3949 update: proto::UpdateRepository,
3950 is_new: bool,
3951 cx: &mut Context<Self>,
3952 ) -> Result<()> {
3953 let conflicted_paths = TreeSet::from_ordered_entries(
3954 update
3955 .current_merge_conflicts
3956 .into_iter()
3957 .map(|path| RepoPath(Path::new(&path).into())),
3958 );
3959 self.snapshot.branch = update.branch_summary.as_ref().map(proto_to_branch);
3960 self.snapshot.head_commit = update
3961 .head_commit_details
3962 .as_ref()
3963 .map(proto_to_commit_details);
3964
3965 self.snapshot.merge.conflicted_paths = conflicted_paths;
3966
3967 let edits = update
3968 .removed_statuses
3969 .into_iter()
3970 .map(|path| sum_tree::Edit::Remove(PathKey(FromProto::from_proto(path))))
3971 .chain(
3972 update
3973 .updated_statuses
3974 .into_iter()
3975 .filter_map(|updated_status| {
3976 Some(sum_tree::Edit::Insert(updated_status.try_into().log_err()?))
3977 }),
3978 )
3979 .collect::<Vec<_>>();
3980 self.snapshot.statuses_by_path.edit(edits, &());
3981 if update.is_last_update {
3982 self.snapshot.scan_id = update.scan_id;
3983 }
3984 cx.emit(RepositoryEvent::Updated {
3985 full_scan: true,
3986 new_instance: is_new,
3987 });
3988 Ok(())
3989 }
3990
3991 pub fn compare_checkpoints(
3992 &mut self,
3993 left: GitRepositoryCheckpoint,
3994 right: GitRepositoryCheckpoint,
3995 ) -> oneshot::Receiver<Result<bool>> {
3996 self.send_job(None, move |repo, _cx| async move {
3997 match repo {
3998 RepositoryState::Local { backend, .. } => {
3999 backend.compare_checkpoints(left, right).await
4000 }
4001 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
4002 }
4003 })
4004 }
4005
4006 pub fn diff_checkpoints(
4007 &mut self,
4008 base_checkpoint: GitRepositoryCheckpoint,
4009 target_checkpoint: GitRepositoryCheckpoint,
4010 ) -> oneshot::Receiver<Result<String>> {
4011 self.send_job(None, move |repo, _cx| async move {
4012 match repo {
4013 RepositoryState::Local { backend, .. } => {
4014 backend
4015 .diff_checkpoints(base_checkpoint, target_checkpoint)
4016 .await
4017 }
4018 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
4019 }
4020 })
4021 }
4022
4023 fn schedule_scan(
4024 &mut self,
4025 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4026 cx: &mut Context<Self>,
4027 ) {
4028 let this = cx.weak_entity();
4029 let _ = self.send_keyed_job(
4030 Some(GitJobKey::ReloadGitState),
4031 None,
4032 |state, mut cx| async move {
4033 log::debug!("run scheduled git status scan");
4034
4035 let Some(this) = this.upgrade() else {
4036 return Ok(());
4037 };
4038 let RepositoryState::Local { backend, .. } = state else {
4039 bail!("not a local repository")
4040 };
4041 let (snapshot, events) = this
4042 .read_with(&mut cx, |this, _| {
4043 compute_snapshot(
4044 this.id,
4045 this.work_directory_abs_path.clone(),
4046 this.snapshot.clone(),
4047 backend.clone(),
4048 )
4049 })?
4050 .await?;
4051 this.update(&mut cx, |this, cx| {
4052 this.snapshot = snapshot.clone();
4053 for event in events {
4054 cx.emit(event);
4055 }
4056 })?;
4057 if let Some(updates_tx) = updates_tx {
4058 updates_tx
4059 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
4060 .ok();
4061 }
4062 Ok(())
4063 },
4064 );
4065 }
4066
4067 fn spawn_local_git_worker(
4068 work_directory_abs_path: Arc<Path>,
4069 dot_git_abs_path: Arc<Path>,
4070 _repository_dir_abs_path: Arc<Path>,
4071 _common_dir_abs_path: Arc<Path>,
4072 project_environment: WeakEntity<ProjectEnvironment>,
4073 fs: Arc<dyn Fs>,
4074 cx: &mut Context<Self>,
4075 ) -> mpsc::UnboundedSender<GitJob> {
4076 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4077
4078 cx.spawn(async move |_, cx| {
4079 let environment = project_environment
4080 .upgrade()
4081 .context("missing project environment")?
4082 .update(cx, |project_environment, cx| {
4083 project_environment.get_directory_environment(work_directory_abs_path.clone(), cx)
4084 })?
4085 .await
4086 .unwrap_or_else(|| {
4087 log::error!("failed to get working directory environment for repository {work_directory_abs_path:?}");
4088 HashMap::default()
4089 });
4090 let backend = cx
4091 .background_spawn(async move {
4092 fs.open_repo(&dot_git_abs_path)
4093 .with_context(|| format!("opening repository at {dot_git_abs_path:?}"))
4094 })
4095 .await?;
4096
4097 if let Some(git_hosting_provider_registry) =
4098 cx.update(|cx| GitHostingProviderRegistry::try_global(cx))?
4099 {
4100 git_hosting_providers::register_additional_providers(
4101 git_hosting_provider_registry,
4102 backend.clone(),
4103 );
4104 }
4105
4106 let state = RepositoryState::Local {
4107 backend,
4108 environment: Arc::new(environment),
4109 };
4110 let mut jobs = VecDeque::new();
4111 loop {
4112 while let Ok(Some(next_job)) = job_rx.try_next() {
4113 jobs.push_back(next_job);
4114 }
4115
4116 if let Some(job) = jobs.pop_front() {
4117 if let Some(current_key) = &job.key {
4118 if jobs
4119 .iter()
4120 .any(|other_job| other_job.key.as_ref() == Some(current_key))
4121 {
4122 continue;
4123 }
4124 }
4125 (job.job)(state.clone(), cx).await;
4126 } else if let Some(job) = job_rx.next().await {
4127 jobs.push_back(job);
4128 } else {
4129 break;
4130 }
4131 }
4132 anyhow::Ok(())
4133 })
4134 .detach_and_log_err(cx);
4135
4136 job_tx
4137 }
4138
4139 fn spawn_remote_git_worker(
4140 project_id: ProjectId,
4141 client: AnyProtoClient,
4142 cx: &mut Context<Self>,
4143 ) -> mpsc::UnboundedSender<GitJob> {
4144 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4145
4146 cx.spawn(async move |_, cx| {
4147 let state = RepositoryState::Remote { project_id, client };
4148 let mut jobs = VecDeque::new();
4149 loop {
4150 while let Ok(Some(next_job)) = job_rx.try_next() {
4151 jobs.push_back(next_job);
4152 }
4153
4154 if let Some(job) = jobs.pop_front() {
4155 if let Some(current_key) = &job.key {
4156 if jobs
4157 .iter()
4158 .any(|other_job| other_job.key.as_ref() == Some(current_key))
4159 {
4160 continue;
4161 }
4162 }
4163 (job.job)(state.clone(), cx).await;
4164 } else if let Some(job) = job_rx.next().await {
4165 jobs.push_back(job);
4166 } else {
4167 break;
4168 }
4169 }
4170 anyhow::Ok(())
4171 })
4172 .detach_and_log_err(cx);
4173
4174 job_tx
4175 }
4176
4177 fn load_staged_text(
4178 &mut self,
4179 buffer_id: BufferId,
4180 repo_path: RepoPath,
4181 cx: &App,
4182 ) -> Task<Result<Option<String>>> {
4183 let rx = self.send_job(None, move |state, _| async move {
4184 match state {
4185 RepositoryState::Local { backend, .. } => {
4186 anyhow::Ok(backend.load_index_text(repo_path).await)
4187 }
4188 RepositoryState::Remote { project_id, client } => {
4189 let response = client
4190 .request(proto::OpenUnstagedDiff {
4191 project_id: project_id.to_proto(),
4192 buffer_id: buffer_id.to_proto(),
4193 })
4194 .await?;
4195 Ok(response.staged_text)
4196 }
4197 }
4198 });
4199 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4200 }
4201
4202 fn load_committed_text(
4203 &mut self,
4204 buffer_id: BufferId,
4205 repo_path: RepoPath,
4206 cx: &App,
4207 ) -> Task<Result<DiffBasesChange>> {
4208 let rx = self.send_job(None, move |state, _| async move {
4209 match state {
4210 RepositoryState::Local { backend, .. } => {
4211 let committed_text = backend.load_committed_text(repo_path.clone()).await;
4212 let staged_text = backend.load_index_text(repo_path).await;
4213 let diff_bases_change = if committed_text == staged_text {
4214 DiffBasesChange::SetBoth(committed_text)
4215 } else {
4216 DiffBasesChange::SetEach {
4217 index: staged_text,
4218 head: committed_text,
4219 }
4220 };
4221 anyhow::Ok(diff_bases_change)
4222 }
4223 RepositoryState::Remote { project_id, client } => {
4224 use proto::open_uncommitted_diff_response::Mode;
4225
4226 let response = client
4227 .request(proto::OpenUncommittedDiff {
4228 project_id: project_id.to_proto(),
4229 buffer_id: buffer_id.to_proto(),
4230 })
4231 .await?;
4232 let mode = Mode::from_i32(response.mode).context("Invalid mode")?;
4233 let bases = match mode {
4234 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
4235 Mode::IndexAndHead => DiffBasesChange::SetEach {
4236 head: response.committed_text,
4237 index: response.staged_text,
4238 },
4239 };
4240 Ok(bases)
4241 }
4242 }
4243 });
4244
4245 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4246 }
4247
4248 fn paths_changed(
4249 &mut self,
4250 paths: Vec<RepoPath>,
4251 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4252 cx: &mut Context<Self>,
4253 ) {
4254 self.paths_needing_status_update.extend(paths);
4255
4256 let this = cx.weak_entity();
4257 let _ = self.send_keyed_job(
4258 Some(GitJobKey::RefreshStatuses),
4259 None,
4260 |state, mut cx| async move {
4261 let (prev_snapshot, mut changed_paths) = this.update(&mut cx, |this, _| {
4262 (
4263 this.snapshot.clone(),
4264 mem::take(&mut this.paths_needing_status_update),
4265 )
4266 })?;
4267 let RepositoryState::Local { backend, .. } = state else {
4268 bail!("not a local repository")
4269 };
4270
4271 let paths = changed_paths.iter().cloned().collect::<Vec<_>>();
4272 let statuses = backend.status(&paths).await?;
4273
4274 let changed_path_statuses = cx
4275 .background_spawn(async move {
4276 let mut changed_path_statuses = Vec::new();
4277 let prev_statuses = prev_snapshot.statuses_by_path.clone();
4278 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4279
4280 for (repo_path, status) in &*statuses.entries {
4281 changed_paths.remove(repo_path);
4282 if cursor.seek_forward(&PathTarget::Path(repo_path), Bias::Left) {
4283 if cursor.item().is_some_and(|entry| entry.status == *status) {
4284 continue;
4285 }
4286 }
4287
4288 changed_path_statuses.push(Edit::Insert(StatusEntry {
4289 repo_path: repo_path.clone(),
4290 status: *status,
4291 }));
4292 }
4293 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4294 for path in changed_paths.into_iter() {
4295 if cursor.seek_forward(&PathTarget::Path(&path), Bias::Left) {
4296 changed_path_statuses.push(Edit::Remove(PathKey(path.0)));
4297 }
4298 }
4299 changed_path_statuses
4300 })
4301 .await;
4302
4303 this.update(&mut cx, |this, cx| {
4304 if !changed_path_statuses.is_empty() {
4305 this.snapshot
4306 .statuses_by_path
4307 .edit(changed_path_statuses, &());
4308 this.snapshot.scan_id += 1;
4309 if let Some(updates_tx) = updates_tx {
4310 updates_tx
4311 .unbounded_send(DownstreamUpdate::UpdateRepository(
4312 this.snapshot.clone(),
4313 ))
4314 .ok();
4315 }
4316 }
4317 cx.emit(RepositoryEvent::Updated {
4318 full_scan: false,
4319 new_instance: false,
4320 });
4321 })
4322 },
4323 );
4324 }
4325
4326 /// currently running git command and when it started
4327 pub fn current_job(&self) -> Option<JobInfo> {
4328 self.active_jobs.values().next().cloned()
4329 }
4330
4331 pub fn barrier(&mut self) -> oneshot::Receiver<()> {
4332 self.send_job(None, |_, _| async {})
4333 }
4334}
4335
4336fn get_permalink_in_rust_registry_src(
4337 provider_registry: Arc<GitHostingProviderRegistry>,
4338 path: PathBuf,
4339 selection: Range<u32>,
4340) -> Result<url::Url> {
4341 #[derive(Deserialize)]
4342 struct CargoVcsGit {
4343 sha1: String,
4344 }
4345
4346 #[derive(Deserialize)]
4347 struct CargoVcsInfo {
4348 git: CargoVcsGit,
4349 path_in_vcs: String,
4350 }
4351
4352 #[derive(Deserialize)]
4353 struct CargoPackage {
4354 repository: String,
4355 }
4356
4357 #[derive(Deserialize)]
4358 struct CargoToml {
4359 package: CargoPackage,
4360 }
4361
4362 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
4363 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
4364 Some((dir, json))
4365 }) else {
4366 bail!("No .cargo_vcs_info.json found in parent directories")
4367 };
4368 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
4369 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
4370 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
4371 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
4372 .context("parsing package.repository field of manifest")?;
4373 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
4374 let permalink = provider.build_permalink(
4375 remote,
4376 BuildPermalinkParams {
4377 sha: &cargo_vcs_info.git.sha1,
4378 path: &path.to_string_lossy(),
4379 selection: Some(selection),
4380 },
4381 );
4382 Ok(permalink)
4383}
4384
4385fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
4386 let Some(blame) = blame else {
4387 return proto::BlameBufferResponse {
4388 blame_response: None,
4389 };
4390 };
4391
4392 let entries = blame
4393 .entries
4394 .into_iter()
4395 .map(|entry| proto::BlameEntry {
4396 sha: entry.sha.as_bytes().into(),
4397 start_line: entry.range.start,
4398 end_line: entry.range.end,
4399 original_line_number: entry.original_line_number,
4400 author: entry.author,
4401 author_mail: entry.author_mail,
4402 author_time: entry.author_time,
4403 author_tz: entry.author_tz,
4404 committer: entry.committer_name,
4405 committer_mail: entry.committer_email,
4406 committer_time: entry.committer_time,
4407 committer_tz: entry.committer_tz,
4408 summary: entry.summary,
4409 previous: entry.previous,
4410 filename: entry.filename,
4411 })
4412 .collect::<Vec<_>>();
4413
4414 let messages = blame
4415 .messages
4416 .into_iter()
4417 .map(|(oid, message)| proto::CommitMessage {
4418 oid: oid.as_bytes().into(),
4419 message,
4420 })
4421 .collect::<Vec<_>>();
4422
4423 proto::BlameBufferResponse {
4424 blame_response: Some(proto::blame_buffer_response::BlameResponse {
4425 entries,
4426 messages,
4427 remote_url: blame.remote_url,
4428 }),
4429 }
4430}
4431
4432fn deserialize_blame_buffer_response(
4433 response: proto::BlameBufferResponse,
4434) -> Option<git::blame::Blame> {
4435 let response = response.blame_response?;
4436 let entries = response
4437 .entries
4438 .into_iter()
4439 .filter_map(|entry| {
4440 Some(git::blame::BlameEntry {
4441 sha: git::Oid::from_bytes(&entry.sha).ok()?,
4442 range: entry.start_line..entry.end_line,
4443 original_line_number: entry.original_line_number,
4444 committer_name: entry.committer,
4445 committer_time: entry.committer_time,
4446 committer_tz: entry.committer_tz,
4447 committer_email: entry.committer_mail,
4448 author: entry.author,
4449 author_mail: entry.author_mail,
4450 author_time: entry.author_time,
4451 author_tz: entry.author_tz,
4452 summary: entry.summary,
4453 previous: entry.previous,
4454 filename: entry.filename,
4455 })
4456 })
4457 .collect::<Vec<_>>();
4458
4459 let messages = response
4460 .messages
4461 .into_iter()
4462 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
4463 .collect::<HashMap<_, _>>();
4464
4465 Some(Blame {
4466 entries,
4467 messages,
4468 remote_url: response.remote_url,
4469 })
4470}
4471
4472fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
4473 proto::Branch {
4474 is_head: branch.is_head,
4475 ref_name: branch.ref_name.to_string(),
4476 unix_timestamp: branch
4477 .most_recent_commit
4478 .as_ref()
4479 .map(|commit| commit.commit_timestamp as u64),
4480 upstream: branch.upstream.as_ref().map(|upstream| proto::GitUpstream {
4481 ref_name: upstream.ref_name.to_string(),
4482 tracking: upstream
4483 .tracking
4484 .status()
4485 .map(|upstream| proto::UpstreamTracking {
4486 ahead: upstream.ahead as u64,
4487 behind: upstream.behind as u64,
4488 }),
4489 }),
4490 most_recent_commit: branch
4491 .most_recent_commit
4492 .as_ref()
4493 .map(|commit| proto::CommitSummary {
4494 sha: commit.sha.to_string(),
4495 subject: commit.subject.to_string(),
4496 commit_timestamp: commit.commit_timestamp,
4497 }),
4498 }
4499}
4500
4501fn proto_to_branch(proto: &proto::Branch) -> git::repository::Branch {
4502 git::repository::Branch {
4503 is_head: proto.is_head,
4504 ref_name: proto.ref_name.clone().into(),
4505 upstream: proto
4506 .upstream
4507 .as_ref()
4508 .map(|upstream| git::repository::Upstream {
4509 ref_name: upstream.ref_name.to_string().into(),
4510 tracking: upstream
4511 .tracking
4512 .as_ref()
4513 .map(|tracking| {
4514 git::repository::UpstreamTracking::Tracked(UpstreamTrackingStatus {
4515 ahead: tracking.ahead as u32,
4516 behind: tracking.behind as u32,
4517 })
4518 })
4519 .unwrap_or(git::repository::UpstreamTracking::Gone),
4520 }),
4521 most_recent_commit: proto.most_recent_commit.as_ref().map(|commit| {
4522 git::repository::CommitSummary {
4523 sha: commit.sha.to_string().into(),
4524 subject: commit.subject.to_string().into(),
4525 commit_timestamp: commit.commit_timestamp,
4526 has_parent: true,
4527 }
4528 }),
4529 }
4530}
4531
4532fn commit_details_to_proto(commit: &CommitDetails) -> proto::GitCommitDetails {
4533 proto::GitCommitDetails {
4534 sha: commit.sha.to_string(),
4535 message: commit.message.to_string(),
4536 commit_timestamp: commit.commit_timestamp,
4537 author_email: commit.author_email.to_string(),
4538 author_name: commit.author_name.to_string(),
4539 }
4540}
4541
4542fn proto_to_commit_details(proto: &proto::GitCommitDetails) -> CommitDetails {
4543 CommitDetails {
4544 sha: proto.sha.clone().into(),
4545 message: proto.message.clone().into(),
4546 commit_timestamp: proto.commit_timestamp,
4547 author_email: proto.author_email.clone().into(),
4548 author_name: proto.author_name.clone().into(),
4549 }
4550}
4551
4552async fn compute_snapshot(
4553 id: RepositoryId,
4554 work_directory_abs_path: Arc<Path>,
4555 prev_snapshot: RepositorySnapshot,
4556 backend: Arc<dyn GitRepository>,
4557) -> Result<(RepositorySnapshot, Vec<RepositoryEvent>)> {
4558 let mut events = Vec::new();
4559 let branches = backend.branches().await?;
4560 let branch = branches.into_iter().find(|branch| branch.is_head);
4561 let statuses = backend
4562 .status(std::slice::from_ref(&WORK_DIRECTORY_REPO_PATH))
4563 .await?;
4564 let statuses_by_path = SumTree::from_iter(
4565 statuses
4566 .entries
4567 .iter()
4568 .map(|(repo_path, status)| StatusEntry {
4569 repo_path: repo_path.clone(),
4570 status: *status,
4571 }),
4572 &(),
4573 );
4574 let (merge_details, merge_heads_changed) =
4575 MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?;
4576 log::debug!("new merge details (changed={merge_heads_changed:?}): {merge_details:?}");
4577
4578 if merge_heads_changed
4579 || branch != prev_snapshot.branch
4580 || statuses_by_path != prev_snapshot.statuses_by_path
4581 {
4582 events.push(RepositoryEvent::Updated {
4583 full_scan: true,
4584 new_instance: false,
4585 });
4586 }
4587
4588 // Cache merge conflict paths so they don't change from staging/unstaging,
4589 // until the merge heads change (at commit time, etc.).
4590 if merge_heads_changed {
4591 events.push(RepositoryEvent::MergeHeadsChanged);
4592 }
4593
4594 // Useful when branch is None in detached head state
4595 let head_commit = match backend.head_sha().await {
4596 Some(head_sha) => backend.show(head_sha).await.log_err(),
4597 None => None,
4598 };
4599
4600 let snapshot = RepositorySnapshot {
4601 id,
4602 statuses_by_path,
4603 work_directory_abs_path,
4604 scan_id: prev_snapshot.scan_id + 1,
4605 branch,
4606 head_commit,
4607 merge: merge_details,
4608 };
4609
4610 Ok((snapshot, events))
4611}
4612
4613fn status_from_proto(
4614 simple_status: i32,
4615 status: Option<proto::GitFileStatus>,
4616) -> anyhow::Result<FileStatus> {
4617 use proto::git_file_status::Variant;
4618
4619 let Some(variant) = status.and_then(|status| status.variant) else {
4620 let code = proto::GitStatus::from_i32(simple_status)
4621 .with_context(|| format!("Invalid git status code: {simple_status}"))?;
4622 let result = match code {
4623 proto::GitStatus::Added => TrackedStatus {
4624 worktree_status: StatusCode::Added,
4625 index_status: StatusCode::Unmodified,
4626 }
4627 .into(),
4628 proto::GitStatus::Modified => TrackedStatus {
4629 worktree_status: StatusCode::Modified,
4630 index_status: StatusCode::Unmodified,
4631 }
4632 .into(),
4633 proto::GitStatus::Conflict => UnmergedStatus {
4634 first_head: UnmergedStatusCode::Updated,
4635 second_head: UnmergedStatusCode::Updated,
4636 }
4637 .into(),
4638 proto::GitStatus::Deleted => TrackedStatus {
4639 worktree_status: StatusCode::Deleted,
4640 index_status: StatusCode::Unmodified,
4641 }
4642 .into(),
4643 _ => anyhow::bail!("Invalid code for simple status: {simple_status}"),
4644 };
4645 return Ok(result);
4646 };
4647
4648 let result = match variant {
4649 Variant::Untracked(_) => FileStatus::Untracked,
4650 Variant::Ignored(_) => FileStatus::Ignored,
4651 Variant::Unmerged(unmerged) => {
4652 let [first_head, second_head] =
4653 [unmerged.first_head, unmerged.second_head].map(|head| {
4654 let code = proto::GitStatus::from_i32(head)
4655 .with_context(|| format!("Invalid git status code: {head}"))?;
4656 let result = match code {
4657 proto::GitStatus::Added => UnmergedStatusCode::Added,
4658 proto::GitStatus::Updated => UnmergedStatusCode::Updated,
4659 proto::GitStatus::Deleted => UnmergedStatusCode::Deleted,
4660 _ => anyhow::bail!("Invalid code for unmerged status: {code:?}"),
4661 };
4662 Ok(result)
4663 });
4664 let [first_head, second_head] = [first_head?, second_head?];
4665 UnmergedStatus {
4666 first_head,
4667 second_head,
4668 }
4669 .into()
4670 }
4671 Variant::Tracked(tracked) => {
4672 let [index_status, worktree_status] = [tracked.index_status, tracked.worktree_status]
4673 .map(|status| {
4674 let code = proto::GitStatus::from_i32(status)
4675 .with_context(|| format!("Invalid git status code: {status}"))?;
4676 let result = match code {
4677 proto::GitStatus::Modified => StatusCode::Modified,
4678 proto::GitStatus::TypeChanged => StatusCode::TypeChanged,
4679 proto::GitStatus::Added => StatusCode::Added,
4680 proto::GitStatus::Deleted => StatusCode::Deleted,
4681 proto::GitStatus::Renamed => StatusCode::Renamed,
4682 proto::GitStatus::Copied => StatusCode::Copied,
4683 proto::GitStatus::Unmodified => StatusCode::Unmodified,
4684 _ => anyhow::bail!("Invalid code for tracked status: {code:?}"),
4685 };
4686 Ok(result)
4687 });
4688 let [index_status, worktree_status] = [index_status?, worktree_status?];
4689 TrackedStatus {
4690 index_status,
4691 worktree_status,
4692 }
4693 .into()
4694 }
4695 };
4696 Ok(result)
4697}
4698
4699fn status_to_proto(status: FileStatus) -> proto::GitFileStatus {
4700 use proto::git_file_status::{Tracked, Unmerged, Variant};
4701
4702 let variant = match status {
4703 FileStatus::Untracked => Variant::Untracked(Default::default()),
4704 FileStatus::Ignored => Variant::Ignored(Default::default()),
4705 FileStatus::Unmerged(UnmergedStatus {
4706 first_head,
4707 second_head,
4708 }) => Variant::Unmerged(Unmerged {
4709 first_head: unmerged_status_to_proto(first_head),
4710 second_head: unmerged_status_to_proto(second_head),
4711 }),
4712 FileStatus::Tracked(TrackedStatus {
4713 index_status,
4714 worktree_status,
4715 }) => Variant::Tracked(Tracked {
4716 index_status: tracked_status_to_proto(index_status),
4717 worktree_status: tracked_status_to_proto(worktree_status),
4718 }),
4719 };
4720 proto::GitFileStatus {
4721 variant: Some(variant),
4722 }
4723}
4724
4725fn unmerged_status_to_proto(code: UnmergedStatusCode) -> i32 {
4726 match code {
4727 UnmergedStatusCode::Added => proto::GitStatus::Added as _,
4728 UnmergedStatusCode::Deleted => proto::GitStatus::Deleted as _,
4729 UnmergedStatusCode::Updated => proto::GitStatus::Updated as _,
4730 }
4731}
4732
4733fn tracked_status_to_proto(code: StatusCode) -> i32 {
4734 match code {
4735 StatusCode::Added => proto::GitStatus::Added as _,
4736 StatusCode::Deleted => proto::GitStatus::Deleted as _,
4737 StatusCode::Modified => proto::GitStatus::Modified as _,
4738 StatusCode::Renamed => proto::GitStatus::Renamed as _,
4739 StatusCode::TypeChanged => proto::GitStatus::TypeChanged as _,
4740 StatusCode::Copied => proto::GitStatus::Copied as _,
4741 StatusCode::Unmodified => proto::GitStatus::Unmodified as _,
4742 }
4743}