1mod conflict_set;
2pub mod git_traversal;
3
4use crate::{
5 ProjectEnvironment, ProjectItem, ProjectPath,
6 buffer_store::{BufferStore, BufferStoreEvent},
7 worktree_store::{WorktreeStore, WorktreeStoreEvent},
8};
9use anyhow::{Context as _, Result, anyhow, bail};
10use askpass::AskPassDelegate;
11use buffer_diff::{BufferDiff, BufferDiffEvent};
12use client::ProjectId;
13use collections::HashMap;
14pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, ConflictSetUpdate};
15use fs::Fs;
16use futures::{
17 FutureExt, StreamExt as _,
18 channel::{mpsc, oneshot},
19 future::{self, Shared},
20};
21use git::{
22 BuildPermalinkParams, GitHostingProviderRegistry, WORK_DIRECTORY_REPO_PATH,
23 blame::Blame,
24 parse_git_remote_url,
25 repository::{
26 Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, DiffType, FetchOptions,
27 GitRepository, GitRepositoryCheckpoint, PushOptions, Remote, RemoteCommandOutput, RepoPath,
28 ResetMode, UpstreamTrackingStatus,
29 },
30 status::{
31 FileStatus, GitSummary, StatusCode, TrackedStatus, UnmergedStatus, UnmergedStatusCode,
32 },
33};
34use gpui::{
35 App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
36 WeakEntity,
37};
38use language::{
39 Buffer, BufferEvent, Language, LanguageRegistry,
40 proto::{deserialize_version, serialize_version},
41};
42use parking_lot::Mutex;
43use postage::stream::Stream as _;
44use rpc::{
45 AnyProtoClient, TypedEnvelope,
46 proto::{self, FromProto, SSH_PROJECT_ID, ToProto, git_reset, split_repository_update},
47};
48use serde::Deserialize;
49use std::{
50 cmp::Ordering,
51 collections::{BTreeSet, VecDeque},
52 future::Future,
53 mem,
54 ops::Range,
55 path::{Path, PathBuf},
56 sync::{
57 Arc,
58 atomic::{self, AtomicU64},
59 },
60 time::Instant,
61};
62use sum_tree::{Edit, SumTree, TreeSet};
63use text::{Bias, BufferId};
64use util::{ResultExt, debug_panic, post_inc};
65use worktree::{
66 File, PathKey, PathProgress, PathSummary, PathTarget, UpdatedGitRepositoriesSet,
67 UpdatedGitRepository, Worktree,
68};
69
70pub struct GitStore {
71 state: GitStoreState,
72 buffer_store: Entity<BufferStore>,
73 worktree_store: Entity<WorktreeStore>,
74 repositories: HashMap<RepositoryId, Entity<Repository>>,
75 active_repo_id: Option<RepositoryId>,
76 #[allow(clippy::type_complexity)]
77 loading_diffs:
78 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
79 diffs: HashMap<BufferId, Entity<BufferGitState>>,
80 shared_diffs: HashMap<proto::PeerId, HashMap<BufferId, SharedDiffs>>,
81 _subscriptions: Vec<Subscription>,
82}
83
84#[derive(Default)]
85struct SharedDiffs {
86 unstaged: Option<Entity<BufferDiff>>,
87 uncommitted: Option<Entity<BufferDiff>>,
88}
89
90struct BufferGitState {
91 unstaged_diff: Option<WeakEntity<BufferDiff>>,
92 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
93 conflict_set: Option<WeakEntity<ConflictSet>>,
94 recalculate_diff_task: Option<Task<Result<()>>>,
95 reparse_conflict_markers_task: Option<Task<Result<()>>>,
96 language: Option<Arc<Language>>,
97 language_registry: Option<Arc<LanguageRegistry>>,
98 conflict_updated_futures: Vec<oneshot::Sender<()>>,
99 recalculating_tx: postage::watch::Sender<bool>,
100
101 /// These operation counts are used to ensure that head and index text
102 /// values read from the git repository are up-to-date with any hunk staging
103 /// operations that have been performed on the BufferDiff.
104 ///
105 /// The operation count is incremented immediately when the user initiates a
106 /// hunk stage/unstage operation. Then, upon finishing writing the new index
107 /// text do disk, the `operation count as of write` is updated to reflect
108 /// the operation count that prompted the write.
109 hunk_staging_operation_count: usize,
110 hunk_staging_operation_count_as_of_write: usize,
111
112 head_text: Option<Arc<String>>,
113 index_text: Option<Arc<String>>,
114 head_changed: bool,
115 index_changed: bool,
116 language_changed: bool,
117}
118
119#[derive(Clone, Debug)]
120enum DiffBasesChange {
121 SetIndex(Option<String>),
122 SetHead(Option<String>),
123 SetEach {
124 index: Option<String>,
125 head: Option<String>,
126 },
127 SetBoth(Option<String>),
128}
129
130#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
131enum DiffKind {
132 Unstaged,
133 Uncommitted,
134}
135
136enum GitStoreState {
137 Local {
138 next_repository_id: Arc<AtomicU64>,
139 downstream: Option<LocalDownstreamState>,
140 project_environment: Entity<ProjectEnvironment>,
141 fs: Arc<dyn Fs>,
142 },
143 Ssh {
144 upstream_client: AnyProtoClient,
145 upstream_project_id: ProjectId,
146 downstream: Option<(AnyProtoClient, ProjectId)>,
147 },
148 Remote {
149 upstream_client: AnyProtoClient,
150 upstream_project_id: ProjectId,
151 },
152}
153
154enum DownstreamUpdate {
155 UpdateRepository(RepositorySnapshot),
156 RemoveRepository(RepositoryId),
157}
158
159struct LocalDownstreamState {
160 client: AnyProtoClient,
161 project_id: ProjectId,
162 updates_tx: mpsc::UnboundedSender<DownstreamUpdate>,
163 _task: Task<Result<()>>,
164}
165
166#[derive(Clone, Debug)]
167pub struct GitStoreCheckpoint {
168 checkpoints_by_work_dir_abs_path: HashMap<Arc<Path>, GitRepositoryCheckpoint>,
169}
170
171#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct StatusEntry {
173 pub repo_path: RepoPath,
174 pub status: FileStatus,
175}
176
177impl StatusEntry {
178 fn to_proto(&self) -> proto::StatusEntry {
179 let simple_status = match self.status {
180 FileStatus::Ignored | FileStatus::Untracked => proto::GitStatus::Added as i32,
181 FileStatus::Unmerged { .. } => proto::GitStatus::Conflict as i32,
182 FileStatus::Tracked(TrackedStatus {
183 index_status,
184 worktree_status,
185 }) => tracked_status_to_proto(if worktree_status != StatusCode::Unmodified {
186 worktree_status
187 } else {
188 index_status
189 }),
190 };
191
192 proto::StatusEntry {
193 repo_path: self.repo_path.as_ref().to_proto(),
194 simple_status,
195 status: Some(status_to_proto(self.status)),
196 }
197 }
198}
199
200impl TryFrom<proto::StatusEntry> for StatusEntry {
201 type Error = anyhow::Error;
202
203 fn try_from(value: proto::StatusEntry) -> Result<Self, Self::Error> {
204 let repo_path = RepoPath(Arc::<Path>::from_proto(value.repo_path));
205 let status = status_from_proto(value.simple_status, value.status)?;
206 Ok(Self { repo_path, status })
207 }
208}
209
210impl sum_tree::Item for StatusEntry {
211 type Summary = PathSummary<GitSummary>;
212
213 fn summary(&self, _: &<Self::Summary as sum_tree::Summary>::Context) -> Self::Summary {
214 PathSummary {
215 max_path: self.repo_path.0.clone(),
216 item_summary: self.status.summary(),
217 }
218 }
219}
220
221impl sum_tree::KeyedItem for StatusEntry {
222 type Key = PathKey;
223
224 fn key(&self) -> Self::Key {
225 PathKey(self.repo_path.0.clone())
226 }
227}
228
229#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
230pub struct RepositoryId(pub u64);
231
232#[derive(Clone, Debug, Default, PartialEq, Eq)]
233pub struct MergeDetails {
234 pub conflicted_paths: TreeSet<RepoPath>,
235 pub message: Option<SharedString>,
236 pub heads: Vec<Option<SharedString>>,
237}
238
239#[derive(Clone, Debug, PartialEq, Eq)]
240pub struct RepositorySnapshot {
241 pub id: RepositoryId,
242 pub statuses_by_path: SumTree<StatusEntry>,
243 pub work_directory_abs_path: Arc<Path>,
244 pub branch: Option<Branch>,
245 pub head_commit: Option<CommitDetails>,
246 pub scan_id: u64,
247 pub merge: MergeDetails,
248}
249
250type JobId = u64;
251
252#[derive(Clone, Debug, PartialEq, Eq)]
253pub struct JobInfo {
254 pub start: Instant,
255 pub message: SharedString,
256}
257
258pub struct Repository {
259 this: WeakEntity<Self>,
260 snapshot: RepositorySnapshot,
261 commit_message_buffer: Option<Entity<Buffer>>,
262 git_store: WeakEntity<GitStore>,
263 // For a local repository, holds paths that have had worktree events since the last status scan completed,
264 // and that should be examined during the next status scan.
265 paths_needing_status_update: BTreeSet<RepoPath>,
266 job_sender: mpsc::UnboundedSender<GitJob>,
267 active_jobs: HashMap<JobId, JobInfo>,
268 job_id: JobId,
269 askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
270 latest_askpass_id: u64,
271}
272
273impl std::ops::Deref for Repository {
274 type Target = RepositorySnapshot;
275
276 fn deref(&self) -> &Self::Target {
277 &self.snapshot
278 }
279}
280
281#[derive(Clone)]
282pub enum RepositoryState {
283 Local {
284 backend: Arc<dyn GitRepository>,
285 environment: Arc<HashMap<String, String>>,
286 },
287 Remote {
288 project_id: ProjectId,
289 client: AnyProtoClient,
290 },
291}
292
293#[derive(Clone, Debug)]
294pub enum RepositoryEvent {
295 Updated { full_scan: bool, new_instance: bool },
296 MergeHeadsChanged,
297}
298
299#[derive(Clone, Debug)]
300pub struct JobsUpdated;
301
302#[derive(Debug)]
303pub enum GitStoreEvent {
304 ActiveRepositoryChanged(Option<RepositoryId>),
305 RepositoryUpdated(RepositoryId, RepositoryEvent, bool),
306 RepositoryAdded(RepositoryId),
307 RepositoryRemoved(RepositoryId),
308 IndexWriteError(anyhow::Error),
309 JobsUpdated,
310 ConflictsUpdated,
311}
312
313impl EventEmitter<RepositoryEvent> for Repository {}
314impl EventEmitter<JobsUpdated> for Repository {}
315impl EventEmitter<GitStoreEvent> for GitStore {}
316
317pub struct GitJob {
318 job: Box<dyn FnOnce(RepositoryState, &mut AsyncApp) -> Task<()>>,
319 key: Option<GitJobKey>,
320}
321
322#[derive(PartialEq, Eq)]
323enum GitJobKey {
324 WriteIndex(RepoPath),
325 ReloadBufferDiffBases,
326 RefreshStatuses,
327 ReloadGitState,
328}
329
330impl GitStore {
331 pub fn local(
332 worktree_store: &Entity<WorktreeStore>,
333 buffer_store: Entity<BufferStore>,
334 environment: Entity<ProjectEnvironment>,
335 fs: Arc<dyn Fs>,
336 cx: &mut Context<Self>,
337 ) -> Self {
338 Self::new(
339 worktree_store.clone(),
340 buffer_store,
341 GitStoreState::Local {
342 next_repository_id: Arc::new(AtomicU64::new(1)),
343 downstream: None,
344 project_environment: environment,
345 fs,
346 },
347 cx,
348 )
349 }
350
351 pub fn remote(
352 worktree_store: &Entity<WorktreeStore>,
353 buffer_store: Entity<BufferStore>,
354 upstream_client: AnyProtoClient,
355 project_id: ProjectId,
356 cx: &mut Context<Self>,
357 ) -> Self {
358 Self::new(
359 worktree_store.clone(),
360 buffer_store,
361 GitStoreState::Remote {
362 upstream_client,
363 upstream_project_id: project_id,
364 },
365 cx,
366 )
367 }
368
369 pub fn ssh(
370 worktree_store: &Entity<WorktreeStore>,
371 buffer_store: Entity<BufferStore>,
372 upstream_client: AnyProtoClient,
373 cx: &mut Context<Self>,
374 ) -> Self {
375 Self::new(
376 worktree_store.clone(),
377 buffer_store,
378 GitStoreState::Ssh {
379 upstream_client,
380 upstream_project_id: ProjectId(SSH_PROJECT_ID),
381 downstream: None,
382 },
383 cx,
384 )
385 }
386
387 fn new(
388 worktree_store: Entity<WorktreeStore>,
389 buffer_store: Entity<BufferStore>,
390 state: GitStoreState,
391 cx: &mut Context<Self>,
392 ) -> Self {
393 let _subscriptions = vec![
394 cx.subscribe(&worktree_store, Self::on_worktree_store_event),
395 cx.subscribe(&buffer_store, Self::on_buffer_store_event),
396 ];
397
398 GitStore {
399 state,
400 buffer_store,
401 worktree_store,
402 repositories: HashMap::default(),
403 active_repo_id: None,
404 _subscriptions,
405 loading_diffs: HashMap::default(),
406 shared_diffs: HashMap::default(),
407 diffs: HashMap::default(),
408 }
409 }
410
411 pub fn init(client: &AnyProtoClient) {
412 client.add_entity_request_handler(Self::handle_get_remotes);
413 client.add_entity_request_handler(Self::handle_get_branches);
414 client.add_entity_request_handler(Self::handle_change_branch);
415 client.add_entity_request_handler(Self::handle_create_branch);
416 client.add_entity_request_handler(Self::handle_git_init);
417 client.add_entity_request_handler(Self::handle_push);
418 client.add_entity_request_handler(Self::handle_pull);
419 client.add_entity_request_handler(Self::handle_fetch);
420 client.add_entity_request_handler(Self::handle_stage);
421 client.add_entity_request_handler(Self::handle_unstage);
422 client.add_entity_request_handler(Self::handle_commit);
423 client.add_entity_request_handler(Self::handle_reset);
424 client.add_entity_request_handler(Self::handle_show);
425 client.add_entity_request_handler(Self::handle_load_commit_diff);
426 client.add_entity_request_handler(Self::handle_checkout_files);
427 client.add_entity_request_handler(Self::handle_open_commit_message_buffer);
428 client.add_entity_request_handler(Self::handle_set_index_text);
429 client.add_entity_request_handler(Self::handle_askpass);
430 client.add_entity_request_handler(Self::handle_check_for_pushed_commits);
431 client.add_entity_request_handler(Self::handle_git_diff);
432 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
433 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
434 client.add_entity_message_handler(Self::handle_update_diff_bases);
435 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
436 client.add_entity_request_handler(Self::handle_blame_buffer);
437 client.add_entity_message_handler(Self::handle_update_repository);
438 client.add_entity_message_handler(Self::handle_remove_repository);
439 }
440
441 pub fn is_local(&self) -> bool {
442 matches!(self.state, GitStoreState::Local { .. })
443 }
444
445 pub fn shared(&mut self, project_id: u64, client: AnyProtoClient, cx: &mut Context<Self>) {
446 match &mut self.state {
447 GitStoreState::Ssh {
448 downstream: downstream_client,
449 ..
450 } => {
451 for repo in self.repositories.values() {
452 let update = repo.read(cx).snapshot.initial_update(project_id);
453 for update in split_repository_update(update) {
454 client.send(update).log_err();
455 }
456 }
457 *downstream_client = Some((client, ProjectId(project_id)));
458 }
459 GitStoreState::Local {
460 downstream: downstream_client,
461 ..
462 } => {
463 let mut snapshots = HashMap::default();
464 let (updates_tx, mut updates_rx) = mpsc::unbounded();
465 for repo in self.repositories.values() {
466 updates_tx
467 .unbounded_send(DownstreamUpdate::UpdateRepository(
468 repo.read(cx).snapshot.clone(),
469 ))
470 .ok();
471 }
472 *downstream_client = Some(LocalDownstreamState {
473 client: client.clone(),
474 project_id: ProjectId(project_id),
475 updates_tx,
476 _task: cx.spawn(async move |this, cx| {
477 cx.background_spawn(async move {
478 while let Some(update) = updates_rx.next().await {
479 match update {
480 DownstreamUpdate::UpdateRepository(snapshot) => {
481 if let Some(old_snapshot) = snapshots.get_mut(&snapshot.id)
482 {
483 let update =
484 snapshot.build_update(old_snapshot, project_id);
485 *old_snapshot = snapshot;
486 for update in split_repository_update(update) {
487 client.send(update)?;
488 }
489 } else {
490 let update = snapshot.initial_update(project_id);
491 for update in split_repository_update(update) {
492 client.send(update)?;
493 }
494 snapshots.insert(snapshot.id, snapshot);
495 }
496 }
497 DownstreamUpdate::RemoveRepository(id) => {
498 client.send(proto::RemoveRepository {
499 project_id,
500 id: id.to_proto(),
501 })?;
502 }
503 }
504 }
505 anyhow::Ok(())
506 })
507 .await
508 .ok();
509 this.update(cx, |this, _| {
510 if let GitStoreState::Local {
511 downstream: downstream_client,
512 ..
513 } = &mut this.state
514 {
515 downstream_client.take();
516 } else {
517 unreachable!("unshared called on remote store");
518 }
519 })
520 }),
521 });
522 }
523 GitStoreState::Remote { .. } => {
524 debug_panic!("shared called on remote store");
525 }
526 }
527 }
528
529 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
530 match &mut self.state {
531 GitStoreState::Local {
532 downstream: downstream_client,
533 ..
534 } => {
535 downstream_client.take();
536 }
537 GitStoreState::Ssh {
538 downstream: downstream_client,
539 ..
540 } => {
541 downstream_client.take();
542 }
543 GitStoreState::Remote { .. } => {
544 debug_panic!("unshared called on remote store");
545 }
546 }
547 self.shared_diffs.clear();
548 }
549
550 pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) {
551 self.shared_diffs.remove(peer_id);
552 }
553
554 pub fn active_repository(&self) -> Option<Entity<Repository>> {
555 self.active_repo_id
556 .as_ref()
557 .map(|id| self.repositories[&id].clone())
558 }
559
560 pub fn open_unstaged_diff(
561 &mut self,
562 buffer: Entity<Buffer>,
563 cx: &mut Context<Self>,
564 ) -> Task<Result<Entity<BufferDiff>>> {
565 let buffer_id = buffer.read(cx).remote_id();
566 if let Some(diff_state) = self.diffs.get(&buffer_id) {
567 if let Some(unstaged_diff) = diff_state
568 .read(cx)
569 .unstaged_diff
570 .as_ref()
571 .and_then(|weak| weak.upgrade())
572 {
573 if let Some(task) =
574 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
575 {
576 return cx.background_executor().spawn(async move {
577 task.await;
578 Ok(unstaged_diff)
579 });
580 }
581 return Task::ready(Ok(unstaged_diff));
582 }
583 }
584
585 let Some((repo, repo_path)) =
586 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
587 else {
588 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
589 };
590
591 let task = self
592 .loading_diffs
593 .entry((buffer_id, DiffKind::Unstaged))
594 .or_insert_with(|| {
595 let staged_text = repo.update(cx, |repo, cx| {
596 repo.load_staged_text(buffer_id, repo_path, cx)
597 });
598 cx.spawn(async move |this, cx| {
599 Self::open_diff_internal(
600 this,
601 DiffKind::Unstaged,
602 staged_text.await.map(DiffBasesChange::SetIndex),
603 buffer,
604 cx,
605 )
606 .await
607 .map_err(Arc::new)
608 })
609 .shared()
610 })
611 .clone();
612
613 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
614 }
615
616 pub fn open_uncommitted_diff(
617 &mut self,
618 buffer: Entity<Buffer>,
619 cx: &mut Context<Self>,
620 ) -> Task<Result<Entity<BufferDiff>>> {
621 let buffer_id = buffer.read(cx).remote_id();
622
623 if let Some(diff_state) = self.diffs.get(&buffer_id) {
624 if let Some(uncommitted_diff) = diff_state
625 .read(cx)
626 .uncommitted_diff
627 .as_ref()
628 .and_then(|weak| weak.upgrade())
629 {
630 if let Some(task) =
631 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
632 {
633 return cx.background_executor().spawn(async move {
634 task.await;
635 Ok(uncommitted_diff)
636 });
637 }
638 return Task::ready(Ok(uncommitted_diff));
639 }
640 }
641
642 let Some((repo, repo_path)) =
643 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
644 else {
645 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
646 };
647
648 let task = self
649 .loading_diffs
650 .entry((buffer_id, DiffKind::Uncommitted))
651 .or_insert_with(|| {
652 let changes = repo.update(cx, |repo, cx| {
653 repo.load_committed_text(buffer_id, repo_path, cx)
654 });
655
656 cx.spawn(async move |this, cx| {
657 Self::open_diff_internal(this, DiffKind::Uncommitted, changes.await, buffer, cx)
658 .await
659 .map_err(Arc::new)
660 })
661 .shared()
662 })
663 .clone();
664
665 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
666 }
667
668 async fn open_diff_internal(
669 this: WeakEntity<Self>,
670 kind: DiffKind,
671 texts: Result<DiffBasesChange>,
672 buffer_entity: Entity<Buffer>,
673 cx: &mut AsyncApp,
674 ) -> Result<Entity<BufferDiff>> {
675 let diff_bases_change = match texts {
676 Err(e) => {
677 this.update(cx, |this, cx| {
678 let buffer = buffer_entity.read(cx);
679 let buffer_id = buffer.remote_id();
680 this.loading_diffs.remove(&(buffer_id, kind));
681 })?;
682 return Err(e);
683 }
684 Ok(change) => change,
685 };
686
687 this.update(cx, |this, cx| {
688 let buffer = buffer_entity.read(cx);
689 let buffer_id = buffer.remote_id();
690 let language = buffer.language().cloned();
691 let language_registry = buffer.language_registry();
692 let text_snapshot = buffer.text_snapshot();
693 this.loading_diffs.remove(&(buffer_id, kind));
694
695 let git_store = cx.weak_entity();
696 let diff_state = this
697 .diffs
698 .entry(buffer_id)
699 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
700
701 let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
702
703 cx.subscribe(&diff, Self::on_buffer_diff_event).detach();
704 diff_state.update(cx, |diff_state, cx| {
705 diff_state.language = language;
706 diff_state.language_registry = language_registry;
707
708 match kind {
709 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
710 DiffKind::Uncommitted => {
711 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
712 diff
713 } else {
714 let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
715 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
716 unstaged_diff
717 };
718
719 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
720 diff_state.uncommitted_diff = Some(diff.downgrade())
721 }
722 }
723
724 diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
725 let rx = diff_state.wait_for_recalculation();
726
727 anyhow::Ok(async move {
728 if let Some(rx) = rx {
729 rx.await;
730 }
731 Ok(diff)
732 })
733 })
734 })??
735 .await
736 }
737
738 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
739 let diff_state = self.diffs.get(&buffer_id)?;
740 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
741 }
742
743 pub fn get_uncommitted_diff(
744 &self,
745 buffer_id: BufferId,
746 cx: &App,
747 ) -> Option<Entity<BufferDiff>> {
748 let diff_state = self.diffs.get(&buffer_id)?;
749 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
750 }
751
752 pub fn open_conflict_set(
753 &mut self,
754 buffer: Entity<Buffer>,
755 cx: &mut Context<Self>,
756 ) -> Entity<ConflictSet> {
757 log::debug!("open conflict set");
758 let buffer_id = buffer.read(cx).remote_id();
759
760 if let Some(git_state) = self.diffs.get(&buffer_id) {
761 if let Some(conflict_set) = git_state
762 .read(cx)
763 .conflict_set
764 .as_ref()
765 .and_then(|weak| weak.upgrade())
766 {
767 let conflict_set = conflict_set.clone();
768 let buffer_snapshot = buffer.read(cx).text_snapshot();
769
770 git_state.update(cx, |state, cx| {
771 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
772 });
773
774 return conflict_set;
775 }
776 }
777
778 let is_unmerged = self
779 .repository_and_path_for_buffer_id(buffer_id, cx)
780 .map_or(false, |(repo, path)| {
781 repo.read(cx).snapshot.has_conflict(&path)
782 });
783 let git_store = cx.weak_entity();
784 let buffer_git_state = self
785 .diffs
786 .entry(buffer_id)
787 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
788 let conflict_set = cx.new(|cx| ConflictSet::new(buffer_id, is_unmerged, cx));
789
790 self._subscriptions
791 .push(cx.subscribe(&conflict_set, |_, _, _, cx| {
792 cx.emit(GitStoreEvent::ConflictsUpdated);
793 }));
794
795 buffer_git_state.update(cx, |state, cx| {
796 state.conflict_set = Some(conflict_set.downgrade());
797 let buffer_snapshot = buffer.read(cx).text_snapshot();
798 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
799 });
800
801 conflict_set
802 }
803
804 pub fn project_path_git_status(
805 &self,
806 project_path: &ProjectPath,
807 cx: &App,
808 ) -> Option<FileStatus> {
809 let (repo, repo_path) = self.repository_and_path_for_project_path(project_path, cx)?;
810 Some(repo.read(cx).status_for_path(&repo_path)?.status)
811 }
812
813 pub fn checkpoint(&self, cx: &mut App) -> Task<Result<GitStoreCheckpoint>> {
814 let mut work_directory_abs_paths = Vec::new();
815 let mut checkpoints = Vec::new();
816 for repository in self.repositories.values() {
817 repository.update(cx, |repository, _| {
818 work_directory_abs_paths.push(repository.snapshot.work_directory_abs_path.clone());
819 checkpoints.push(repository.checkpoint().map(|checkpoint| checkpoint?));
820 });
821 }
822
823 cx.background_executor().spawn(async move {
824 let checkpoints = future::try_join_all(checkpoints).await?;
825 Ok(GitStoreCheckpoint {
826 checkpoints_by_work_dir_abs_path: work_directory_abs_paths
827 .into_iter()
828 .zip(checkpoints)
829 .collect(),
830 })
831 })
832 }
833
834 pub fn restore_checkpoint(
835 &self,
836 checkpoint: GitStoreCheckpoint,
837 cx: &mut App,
838 ) -> Task<Result<()>> {
839 let repositories_by_work_dir_abs_path = self
840 .repositories
841 .values()
842 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
843 .collect::<HashMap<_, _>>();
844
845 let mut tasks = Vec::new();
846 for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
847 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path) {
848 let restore = repository.update(cx, |repository, _| {
849 repository.restore_checkpoint(checkpoint)
850 });
851 tasks.push(async move { restore.await? });
852 }
853 }
854 cx.background_spawn(async move {
855 future::try_join_all(tasks).await?;
856 Ok(())
857 })
858 }
859
860 /// Compares two checkpoints, returning true if they are equal.
861 pub fn compare_checkpoints(
862 &self,
863 left: GitStoreCheckpoint,
864 mut right: GitStoreCheckpoint,
865 cx: &mut App,
866 ) -> Task<Result<bool>> {
867 let repositories_by_work_dir_abs_path = self
868 .repositories
869 .values()
870 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
871 .collect::<HashMap<_, _>>();
872
873 let mut tasks = Vec::new();
874 for (work_dir_abs_path, left_checkpoint) in left.checkpoints_by_work_dir_abs_path {
875 if let Some(right_checkpoint) = right
876 .checkpoints_by_work_dir_abs_path
877 .remove(&work_dir_abs_path)
878 {
879 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path)
880 {
881 let compare = repository.update(cx, |repository, _| {
882 repository.compare_checkpoints(left_checkpoint, right_checkpoint)
883 });
884
885 tasks.push(async move { compare.await? });
886 }
887 } else {
888 return Task::ready(Ok(false));
889 }
890 }
891 cx.background_spawn(async move {
892 Ok(future::try_join_all(tasks)
893 .await?
894 .into_iter()
895 .all(|result| result))
896 })
897 }
898
899 /// Blames a buffer.
900 pub fn blame_buffer(
901 &self,
902 buffer: &Entity<Buffer>,
903 version: Option<clock::Global>,
904 cx: &mut App,
905 ) -> Task<Result<Option<Blame>>> {
906 let buffer = buffer.read(cx);
907 let Some((repo, repo_path)) =
908 self.repository_and_path_for_buffer_id(buffer.remote_id(), cx)
909 else {
910 return Task::ready(Err(anyhow!("failed to find a git repository for buffer")));
911 };
912 let content = match &version {
913 Some(version) => buffer.rope_for_version(version).clone(),
914 None => buffer.as_rope().clone(),
915 };
916 let version = version.unwrap_or(buffer.version());
917 let buffer_id = buffer.remote_id();
918
919 let rx = repo.update(cx, |repo, _| {
920 repo.send_job(None, move |state, _| async move {
921 match state {
922 RepositoryState::Local { backend, .. } => backend
923 .blame(repo_path.clone(), content)
924 .await
925 .with_context(|| format!("Failed to blame {:?}", repo_path.0))
926 .map(Some),
927 RepositoryState::Remote { project_id, client } => {
928 let response = client
929 .request(proto::BlameBuffer {
930 project_id: project_id.to_proto(),
931 buffer_id: buffer_id.into(),
932 version: serialize_version(&version),
933 })
934 .await?;
935 Ok(deserialize_blame_buffer_response(response))
936 }
937 }
938 })
939 });
940
941 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
942 }
943
944 pub fn get_permalink_to_line(
945 &self,
946 buffer: &Entity<Buffer>,
947 selection: Range<u32>,
948 cx: &mut App,
949 ) -> Task<Result<url::Url>> {
950 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
951 return Task::ready(Err(anyhow!("buffer has no file")));
952 };
953
954 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
955 &(file.worktree.read(cx).id(), file.path.clone()).into(),
956 cx,
957 ) else {
958 // If we're not in a Git repo, check whether this is a Rust source
959 // file in the Cargo registry (presumably opened with go-to-definition
960 // from a normal Rust file). If so, we can put together a permalink
961 // using crate metadata.
962 if buffer
963 .read(cx)
964 .language()
965 .is_none_or(|lang| lang.name() != "Rust".into())
966 {
967 return Task::ready(Err(anyhow!("no permalink available")));
968 }
969 let Some(file_path) = file.worktree.read(cx).absolutize(&file.path).ok() else {
970 return Task::ready(Err(anyhow!("no permalink available")));
971 };
972 return cx.spawn(async move |cx| {
973 let provider_registry = cx.update(GitHostingProviderRegistry::default_global)?;
974 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
975 .context("no permalink available")
976 });
977
978 // TODO remote case
979 };
980
981 let buffer_id = buffer.read(cx).remote_id();
982 let branch = repo.read(cx).branch.clone();
983 let remote = branch
984 .as_ref()
985 .and_then(|b| b.upstream.as_ref())
986 .and_then(|b| b.remote_name())
987 .unwrap_or("origin")
988 .to_string();
989
990 let rx = repo.update(cx, |repo, _| {
991 repo.send_job(None, move |state, cx| async move {
992 match state {
993 RepositoryState::Local { backend, .. } => {
994 let origin_url = backend
995 .remote_url(&remote)
996 .with_context(|| format!("remote \"{remote}\" not found"))?;
997
998 let sha = backend.head_sha().await.context("reading HEAD SHA")?;
999
1000 let provider_registry =
1001 cx.update(GitHostingProviderRegistry::default_global)?;
1002
1003 let (provider, remote) =
1004 parse_git_remote_url(provider_registry, &origin_url)
1005 .context("parsing Git remote URL")?;
1006
1007 let path = repo_path.to_str().with_context(|| {
1008 format!("converting repo path {repo_path:?} to string")
1009 })?;
1010
1011 Ok(provider.build_permalink(
1012 remote,
1013 BuildPermalinkParams {
1014 sha: &sha,
1015 path,
1016 selection: Some(selection),
1017 },
1018 ))
1019 }
1020 RepositoryState::Remote { project_id, client } => {
1021 let response = client
1022 .request(proto::GetPermalinkToLine {
1023 project_id: project_id.to_proto(),
1024 buffer_id: buffer_id.into(),
1025 selection: Some(proto::Range {
1026 start: selection.start as u64,
1027 end: selection.end as u64,
1028 }),
1029 })
1030 .await?;
1031
1032 url::Url::parse(&response.permalink).context("failed to parse permalink")
1033 }
1034 }
1035 })
1036 });
1037 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1038 }
1039
1040 fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> {
1041 match &self.state {
1042 GitStoreState::Local {
1043 downstream: downstream_client,
1044 ..
1045 } => downstream_client
1046 .as_ref()
1047 .map(|state| (state.client.clone(), state.project_id)),
1048 GitStoreState::Ssh {
1049 downstream: downstream_client,
1050 ..
1051 } => downstream_client.clone(),
1052 GitStoreState::Remote { .. } => None,
1053 }
1054 }
1055
1056 fn upstream_client(&self) -> Option<AnyProtoClient> {
1057 match &self.state {
1058 GitStoreState::Local { .. } => None,
1059 GitStoreState::Ssh {
1060 upstream_client, ..
1061 }
1062 | GitStoreState::Remote {
1063 upstream_client, ..
1064 } => Some(upstream_client.clone()),
1065 }
1066 }
1067
1068 fn on_worktree_store_event(
1069 &mut self,
1070 worktree_store: Entity<WorktreeStore>,
1071 event: &WorktreeStoreEvent,
1072 cx: &mut Context<Self>,
1073 ) {
1074 let GitStoreState::Local {
1075 project_environment,
1076 downstream,
1077 next_repository_id,
1078 fs,
1079 } = &self.state
1080 else {
1081 return;
1082 };
1083
1084 match event {
1085 WorktreeStoreEvent::WorktreeUpdatedEntries(worktree_id, updated_entries) => {
1086 let mut paths_by_git_repo = HashMap::<_, Vec<_>>::default();
1087 for (relative_path, _, _) in updated_entries.iter() {
1088 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
1089 &(*worktree_id, relative_path.clone()).into(),
1090 cx,
1091 ) else {
1092 continue;
1093 };
1094 paths_by_git_repo.entry(repo).or_default().push(repo_path)
1095 }
1096
1097 for (repo, paths) in paths_by_git_repo {
1098 repo.update(cx, |repo, cx| {
1099 repo.paths_changed(
1100 paths,
1101 downstream
1102 .as_ref()
1103 .map(|downstream| downstream.updates_tx.clone()),
1104 cx,
1105 );
1106 });
1107 }
1108 }
1109 WorktreeStoreEvent::WorktreeUpdatedGitRepositories(worktree_id, changed_repos) => {
1110 let Some(worktree) = worktree_store.read(cx).worktree_for_id(*worktree_id, cx)
1111 else {
1112 return;
1113 };
1114 if !worktree.read(cx).is_visible() {
1115 log::debug!(
1116 "not adding repositories for local worktree {:?} because it's not visible",
1117 worktree.read(cx).abs_path()
1118 );
1119 return;
1120 }
1121 self.update_repositories_from_worktree(
1122 project_environment.clone(),
1123 next_repository_id.clone(),
1124 downstream
1125 .as_ref()
1126 .map(|downstream| downstream.updates_tx.clone()),
1127 changed_repos.clone(),
1128 fs.clone(),
1129 cx,
1130 );
1131 self.local_worktree_git_repos_changed(worktree, changed_repos, cx);
1132 }
1133 _ => {}
1134 }
1135 }
1136
1137 fn on_repository_event(
1138 &mut self,
1139 repo: Entity<Repository>,
1140 event: &RepositoryEvent,
1141 cx: &mut Context<Self>,
1142 ) {
1143 let id = repo.read(cx).id;
1144 let repo_snapshot = repo.read(cx).snapshot.clone();
1145 for (buffer_id, diff) in self.diffs.iter() {
1146 if let Some((buffer_repo, repo_path)) =
1147 self.repository_and_path_for_buffer_id(*buffer_id, cx)
1148 {
1149 if buffer_repo == repo {
1150 diff.update(cx, |diff, cx| {
1151 if let Some(conflict_set) = &diff.conflict_set {
1152 let conflict_status_changed =
1153 conflict_set.update(cx, |conflict_set, cx| {
1154 let has_conflict = repo_snapshot.has_conflict(&repo_path);
1155 conflict_set.set_has_conflict(has_conflict, cx)
1156 })?;
1157 if conflict_status_changed {
1158 let buffer_store = self.buffer_store.read(cx);
1159 if let Some(buffer) = buffer_store.get(*buffer_id) {
1160 let _ = diff.reparse_conflict_markers(
1161 buffer.read(cx).text_snapshot(),
1162 cx,
1163 );
1164 }
1165 }
1166 }
1167 anyhow::Ok(())
1168 })
1169 .ok();
1170 }
1171 }
1172 }
1173 cx.emit(GitStoreEvent::RepositoryUpdated(
1174 id,
1175 event.clone(),
1176 self.active_repo_id == Some(id),
1177 ))
1178 }
1179
1180 fn on_jobs_updated(&mut self, _: Entity<Repository>, _: &JobsUpdated, cx: &mut Context<Self>) {
1181 cx.emit(GitStoreEvent::JobsUpdated)
1182 }
1183
1184 /// Update our list of repositories and schedule git scans in response to a notification from a worktree,
1185 fn update_repositories_from_worktree(
1186 &mut self,
1187 project_environment: Entity<ProjectEnvironment>,
1188 next_repository_id: Arc<AtomicU64>,
1189 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
1190 updated_git_repositories: UpdatedGitRepositoriesSet,
1191 fs: Arc<dyn Fs>,
1192 cx: &mut Context<Self>,
1193 ) {
1194 let mut removed_ids = Vec::new();
1195 for update in updated_git_repositories.iter() {
1196 if let Some((id, existing)) = self.repositories.iter().find(|(_, repo)| {
1197 let existing_work_directory_abs_path =
1198 repo.read(cx).work_directory_abs_path.clone();
1199 Some(&existing_work_directory_abs_path)
1200 == update.old_work_directory_abs_path.as_ref()
1201 || Some(&existing_work_directory_abs_path)
1202 == update.new_work_directory_abs_path.as_ref()
1203 }) {
1204 if let Some(new_work_directory_abs_path) =
1205 update.new_work_directory_abs_path.clone()
1206 {
1207 existing.update(cx, |existing, cx| {
1208 existing.snapshot.work_directory_abs_path = new_work_directory_abs_path;
1209 existing.schedule_scan(updates_tx.clone(), cx);
1210 });
1211 } else {
1212 removed_ids.push(*id);
1213 }
1214 } else if let UpdatedGitRepository {
1215 new_work_directory_abs_path: Some(work_directory_abs_path),
1216 dot_git_abs_path: Some(dot_git_abs_path),
1217 repository_dir_abs_path: Some(repository_dir_abs_path),
1218 common_dir_abs_path: Some(common_dir_abs_path),
1219 ..
1220 } = update
1221 {
1222 let id = RepositoryId(next_repository_id.fetch_add(1, atomic::Ordering::Release));
1223 let git_store = cx.weak_entity();
1224 let repo = cx.new(|cx| {
1225 let mut repo = Repository::local(
1226 id,
1227 work_directory_abs_path.clone(),
1228 dot_git_abs_path.clone(),
1229 repository_dir_abs_path.clone(),
1230 common_dir_abs_path.clone(),
1231 project_environment.downgrade(),
1232 fs.clone(),
1233 git_store,
1234 cx,
1235 );
1236 repo.schedule_scan(updates_tx.clone(), cx);
1237 repo
1238 });
1239 self._subscriptions
1240 .push(cx.subscribe(&repo, Self::on_repository_event));
1241 self._subscriptions
1242 .push(cx.subscribe(&repo, Self::on_jobs_updated));
1243 self.repositories.insert(id, repo);
1244 cx.emit(GitStoreEvent::RepositoryAdded(id));
1245 self.active_repo_id.get_or_insert_with(|| {
1246 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1247 id
1248 });
1249 }
1250 }
1251
1252 for id in removed_ids {
1253 if self.active_repo_id == Some(id) {
1254 self.active_repo_id = None;
1255 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1256 }
1257 self.repositories.remove(&id);
1258 if let Some(updates_tx) = updates_tx.as_ref() {
1259 updates_tx
1260 .unbounded_send(DownstreamUpdate::RemoveRepository(id))
1261 .ok();
1262 }
1263 }
1264 }
1265
1266 fn on_buffer_store_event(
1267 &mut self,
1268 _: Entity<BufferStore>,
1269 event: &BufferStoreEvent,
1270 cx: &mut Context<Self>,
1271 ) {
1272 match event {
1273 BufferStoreEvent::BufferAdded(buffer) => {
1274 cx.subscribe(&buffer, |this, buffer, event, cx| {
1275 if let BufferEvent::LanguageChanged = event {
1276 let buffer_id = buffer.read(cx).remote_id();
1277 if let Some(diff_state) = this.diffs.get(&buffer_id) {
1278 diff_state.update(cx, |diff_state, cx| {
1279 diff_state.buffer_language_changed(buffer, cx);
1280 });
1281 }
1282 }
1283 })
1284 .detach();
1285 }
1286 BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => {
1287 if let Some(diffs) = self.shared_diffs.get_mut(peer_id) {
1288 diffs.remove(buffer_id);
1289 }
1290 }
1291 BufferStoreEvent::BufferDropped(buffer_id) => {
1292 self.diffs.remove(&buffer_id);
1293 for diffs in self.shared_diffs.values_mut() {
1294 diffs.remove(buffer_id);
1295 }
1296 }
1297
1298 _ => {}
1299 }
1300 }
1301
1302 pub fn recalculate_buffer_diffs(
1303 &mut self,
1304 buffers: Vec<Entity<Buffer>>,
1305 cx: &mut Context<Self>,
1306 ) -> impl Future<Output = ()> + use<> {
1307 let mut futures = Vec::new();
1308 for buffer in buffers {
1309 if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
1310 let buffer = buffer.read(cx).text_snapshot();
1311 diff_state.update(cx, |diff_state, cx| {
1312 diff_state.recalculate_diffs(buffer.clone(), cx);
1313 futures.extend(diff_state.wait_for_recalculation().map(FutureExt::boxed));
1314 });
1315 futures.push(diff_state.update(cx, |diff_state, cx| {
1316 diff_state
1317 .reparse_conflict_markers(buffer, cx)
1318 .map(|_| {})
1319 .boxed()
1320 }));
1321 }
1322 }
1323 async move {
1324 futures::future::join_all(futures).await;
1325 }
1326 }
1327
1328 fn on_buffer_diff_event(
1329 &mut self,
1330 diff: Entity<buffer_diff::BufferDiff>,
1331 event: &BufferDiffEvent,
1332 cx: &mut Context<Self>,
1333 ) {
1334 if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
1335 let buffer_id = diff.read(cx).buffer_id;
1336 if let Some(diff_state) = self.diffs.get(&buffer_id) {
1337 let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
1338 diff_state.hunk_staging_operation_count += 1;
1339 diff_state.hunk_staging_operation_count
1340 });
1341 if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
1342 let recv = repo.update(cx, |repo, cx| {
1343 log::debug!("hunks changed for {}", path.display());
1344 repo.spawn_set_index_text_job(
1345 path,
1346 new_index_text.as_ref().map(|rope| rope.to_string()),
1347 Some(hunk_staging_operation_count),
1348 cx,
1349 )
1350 });
1351 let diff = diff.downgrade();
1352 cx.spawn(async move |this, cx| {
1353 if let Ok(Err(error)) = cx.background_spawn(recv).await {
1354 diff.update(cx, |diff, cx| {
1355 diff.clear_pending_hunks(cx);
1356 })
1357 .ok();
1358 this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
1359 .ok();
1360 }
1361 })
1362 .detach();
1363 }
1364 }
1365 }
1366 }
1367
1368 fn local_worktree_git_repos_changed(
1369 &mut self,
1370 worktree: Entity<Worktree>,
1371 changed_repos: &UpdatedGitRepositoriesSet,
1372 cx: &mut Context<Self>,
1373 ) {
1374 log::debug!("local worktree repos changed");
1375 debug_assert!(worktree.read(cx).is_local());
1376
1377 for repository in self.repositories.values() {
1378 repository.update(cx, |repository, cx| {
1379 let repo_abs_path = &repository.work_directory_abs_path;
1380 if changed_repos.iter().any(|update| {
1381 update.old_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1382 || update.new_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1383 }) {
1384 repository.reload_buffer_diff_bases(cx);
1385 }
1386 });
1387 }
1388 }
1389
1390 pub fn repositories(&self) -> &HashMap<RepositoryId, Entity<Repository>> {
1391 &self.repositories
1392 }
1393
1394 pub fn status_for_buffer_id(&self, buffer_id: BufferId, cx: &App) -> Option<FileStatus> {
1395 let (repo, path) = self.repository_and_path_for_buffer_id(buffer_id, cx)?;
1396 let status = repo.read(cx).snapshot.status_for_path(&path)?;
1397 Some(status.status)
1398 }
1399
1400 pub fn repository_and_path_for_buffer_id(
1401 &self,
1402 buffer_id: BufferId,
1403 cx: &App,
1404 ) -> Option<(Entity<Repository>, RepoPath)> {
1405 let buffer = self.buffer_store.read(cx).get(buffer_id)?;
1406 let project_path = buffer.read(cx).project_path(cx)?;
1407 self.repository_and_path_for_project_path(&project_path, cx)
1408 }
1409
1410 pub fn repository_and_path_for_project_path(
1411 &self,
1412 path: &ProjectPath,
1413 cx: &App,
1414 ) -> Option<(Entity<Repository>, RepoPath)> {
1415 let abs_path = self.worktree_store.read(cx).absolutize(path, cx)?;
1416 self.repositories
1417 .values()
1418 .filter_map(|repo| {
1419 let repo_path = repo.read(cx).abs_path_to_repo_path(&abs_path)?;
1420 Some((repo.clone(), repo_path))
1421 })
1422 .max_by_key(|(repo, _)| repo.read(cx).work_directory_abs_path.clone())
1423 }
1424
1425 pub fn git_init(
1426 &self,
1427 path: Arc<Path>,
1428 fallback_branch_name: String,
1429 cx: &App,
1430 ) -> Task<Result<()>> {
1431 match &self.state {
1432 GitStoreState::Local { fs, .. } => {
1433 let fs = fs.clone();
1434 cx.background_executor()
1435 .spawn(async move { fs.git_init(&path, fallback_branch_name) })
1436 }
1437 GitStoreState::Ssh {
1438 upstream_client,
1439 upstream_project_id: project_id,
1440 ..
1441 }
1442 | GitStoreState::Remote {
1443 upstream_client,
1444 upstream_project_id: project_id,
1445 ..
1446 } => {
1447 let client = upstream_client.clone();
1448 let project_id = *project_id;
1449 cx.background_executor().spawn(async move {
1450 client
1451 .request(proto::GitInit {
1452 project_id: project_id.0,
1453 abs_path: path.to_string_lossy().to_string(),
1454 fallback_branch_name,
1455 })
1456 .await?;
1457 Ok(())
1458 })
1459 }
1460 }
1461 }
1462
1463 async fn handle_update_repository(
1464 this: Entity<Self>,
1465 envelope: TypedEnvelope<proto::UpdateRepository>,
1466 mut cx: AsyncApp,
1467 ) -> Result<()> {
1468 this.update(&mut cx, |this, cx| {
1469 let mut update = envelope.payload;
1470
1471 let id = RepositoryId::from_proto(update.id);
1472 let client = this
1473 .upstream_client()
1474 .context("no upstream client")?
1475 .clone();
1476
1477 let mut is_new = false;
1478 let repo = this.repositories.entry(id).or_insert_with(|| {
1479 is_new = true;
1480 let git_store = cx.weak_entity();
1481 cx.new(|cx| {
1482 Repository::remote(
1483 id,
1484 Path::new(&update.abs_path).into(),
1485 ProjectId(update.project_id),
1486 client,
1487 git_store,
1488 cx,
1489 )
1490 })
1491 });
1492 if is_new {
1493 this._subscriptions
1494 .push(cx.subscribe(&repo, Self::on_repository_event))
1495 }
1496
1497 repo.update(cx, {
1498 let update = update.clone();
1499 |repo, cx| repo.apply_remote_update(update, is_new, cx)
1500 })?;
1501
1502 this.active_repo_id.get_or_insert_with(|| {
1503 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1504 id
1505 });
1506
1507 if let Some((client, project_id)) = this.downstream_client() {
1508 update.project_id = project_id.to_proto();
1509 client.send(update).log_err();
1510 }
1511 Ok(())
1512 })?
1513 }
1514
1515 async fn handle_remove_repository(
1516 this: Entity<Self>,
1517 envelope: TypedEnvelope<proto::RemoveRepository>,
1518 mut cx: AsyncApp,
1519 ) -> Result<()> {
1520 this.update(&mut cx, |this, cx| {
1521 let mut update = envelope.payload;
1522 let id = RepositoryId::from_proto(update.id);
1523 this.repositories.remove(&id);
1524 if let Some((client, project_id)) = this.downstream_client() {
1525 update.project_id = project_id.to_proto();
1526 client.send(update).log_err();
1527 }
1528 if this.active_repo_id == Some(id) {
1529 this.active_repo_id = None;
1530 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1531 }
1532 cx.emit(GitStoreEvent::RepositoryRemoved(id));
1533 })
1534 }
1535
1536 async fn handle_git_init(
1537 this: Entity<Self>,
1538 envelope: TypedEnvelope<proto::GitInit>,
1539 cx: AsyncApp,
1540 ) -> Result<proto::Ack> {
1541 let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1542 let name = envelope.payload.fallback_branch_name;
1543 cx.update(|cx| this.read(cx).git_init(path, name, cx))?
1544 .await?;
1545
1546 Ok(proto::Ack {})
1547 }
1548
1549 async fn handle_fetch(
1550 this: Entity<Self>,
1551 envelope: TypedEnvelope<proto::Fetch>,
1552 mut cx: AsyncApp,
1553 ) -> Result<proto::RemoteMessageResponse> {
1554 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1555 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1556 let fetch_options = FetchOptions::from_proto(envelope.payload.remote);
1557 let askpass_id = envelope.payload.askpass_id;
1558
1559 let askpass = make_remote_delegate(
1560 this,
1561 envelope.payload.project_id,
1562 repository_id,
1563 askpass_id,
1564 &mut cx,
1565 );
1566
1567 let remote_output = repository_handle
1568 .update(&mut cx, |repository_handle, cx| {
1569 repository_handle.fetch(fetch_options, askpass, cx)
1570 })?
1571 .await??;
1572
1573 Ok(proto::RemoteMessageResponse {
1574 stdout: remote_output.stdout,
1575 stderr: remote_output.stderr,
1576 })
1577 }
1578
1579 async fn handle_push(
1580 this: Entity<Self>,
1581 envelope: TypedEnvelope<proto::Push>,
1582 mut cx: AsyncApp,
1583 ) -> Result<proto::RemoteMessageResponse> {
1584 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1585 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1586
1587 let askpass_id = envelope.payload.askpass_id;
1588 let askpass = make_remote_delegate(
1589 this,
1590 envelope.payload.project_id,
1591 repository_id,
1592 askpass_id,
1593 &mut cx,
1594 );
1595
1596 let options = envelope
1597 .payload
1598 .options
1599 .as_ref()
1600 .map(|_| match envelope.payload.options() {
1601 proto::push::PushOptions::SetUpstream => git::repository::PushOptions::SetUpstream,
1602 proto::push::PushOptions::Force => git::repository::PushOptions::Force,
1603 });
1604
1605 let branch_name = envelope.payload.branch_name.into();
1606 let remote_name = envelope.payload.remote_name.into();
1607
1608 let remote_output = repository_handle
1609 .update(&mut cx, |repository_handle, cx| {
1610 repository_handle.push(branch_name, remote_name, options, askpass, cx)
1611 })?
1612 .await??;
1613 Ok(proto::RemoteMessageResponse {
1614 stdout: remote_output.stdout,
1615 stderr: remote_output.stderr,
1616 })
1617 }
1618
1619 async fn handle_pull(
1620 this: Entity<Self>,
1621 envelope: TypedEnvelope<proto::Pull>,
1622 mut cx: AsyncApp,
1623 ) -> Result<proto::RemoteMessageResponse> {
1624 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1625 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1626 let askpass_id = envelope.payload.askpass_id;
1627 let askpass = make_remote_delegate(
1628 this,
1629 envelope.payload.project_id,
1630 repository_id,
1631 askpass_id,
1632 &mut cx,
1633 );
1634
1635 let branch_name = envelope.payload.branch_name.into();
1636 let remote_name = envelope.payload.remote_name.into();
1637
1638 let remote_message = repository_handle
1639 .update(&mut cx, |repository_handle, cx| {
1640 repository_handle.pull(branch_name, remote_name, askpass, cx)
1641 })?
1642 .await??;
1643
1644 Ok(proto::RemoteMessageResponse {
1645 stdout: remote_message.stdout,
1646 stderr: remote_message.stderr,
1647 })
1648 }
1649
1650 async fn handle_stage(
1651 this: Entity<Self>,
1652 envelope: TypedEnvelope<proto::Stage>,
1653 mut cx: AsyncApp,
1654 ) -> Result<proto::Ack> {
1655 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1656 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1657
1658 let entries = envelope
1659 .payload
1660 .paths
1661 .into_iter()
1662 .map(PathBuf::from)
1663 .map(RepoPath::new)
1664 .collect();
1665
1666 repository_handle
1667 .update(&mut cx, |repository_handle, cx| {
1668 repository_handle.stage_entries(entries, cx)
1669 })?
1670 .await?;
1671 Ok(proto::Ack {})
1672 }
1673
1674 async fn handle_unstage(
1675 this: Entity<Self>,
1676 envelope: TypedEnvelope<proto::Unstage>,
1677 mut cx: AsyncApp,
1678 ) -> Result<proto::Ack> {
1679 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1680 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1681
1682 let entries = envelope
1683 .payload
1684 .paths
1685 .into_iter()
1686 .map(PathBuf::from)
1687 .map(RepoPath::new)
1688 .collect();
1689
1690 repository_handle
1691 .update(&mut cx, |repository_handle, cx| {
1692 repository_handle.unstage_entries(entries, cx)
1693 })?
1694 .await?;
1695
1696 Ok(proto::Ack {})
1697 }
1698
1699 async fn handle_set_index_text(
1700 this: Entity<Self>,
1701 envelope: TypedEnvelope<proto::SetIndexText>,
1702 mut cx: AsyncApp,
1703 ) -> Result<proto::Ack> {
1704 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1705 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1706 let repo_path = RepoPath::from_str(&envelope.payload.path);
1707
1708 repository_handle
1709 .update(&mut cx, |repository_handle, cx| {
1710 repository_handle.spawn_set_index_text_job(
1711 repo_path,
1712 envelope.payload.text,
1713 None,
1714 cx,
1715 )
1716 })?
1717 .await??;
1718 Ok(proto::Ack {})
1719 }
1720
1721 async fn handle_commit(
1722 this: Entity<Self>,
1723 envelope: TypedEnvelope<proto::Commit>,
1724 mut cx: AsyncApp,
1725 ) -> Result<proto::Ack> {
1726 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1727 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1728
1729 let askpass = if let Some(askpass_id) = envelope.payload.askpass_id {
1730 make_remote_delegate(
1731 this,
1732 envelope.payload.project_id,
1733 repository_id,
1734 askpass_id,
1735 &mut cx,
1736 )
1737 } else {
1738 AskPassDelegate::new_always_failing()
1739 };
1740
1741 let message = SharedString::from(envelope.payload.message);
1742 let name = envelope.payload.name.map(SharedString::from);
1743 let email = envelope.payload.email.map(SharedString::from);
1744 let options = envelope.payload.options.unwrap_or_default();
1745
1746 repository_handle
1747 .update(&mut cx, |repository_handle, cx| {
1748 repository_handle.commit(
1749 message,
1750 name.zip(email),
1751 CommitOptions {
1752 amend: options.amend,
1753 },
1754 askpass,
1755 cx,
1756 )
1757 })?
1758 .await??;
1759 Ok(proto::Ack {})
1760 }
1761
1762 async fn handle_get_remotes(
1763 this: Entity<Self>,
1764 envelope: TypedEnvelope<proto::GetRemotes>,
1765 mut cx: AsyncApp,
1766 ) -> Result<proto::GetRemotesResponse> {
1767 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1768 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1769
1770 let branch_name = envelope.payload.branch_name;
1771
1772 let remotes = repository_handle
1773 .update(&mut cx, |repository_handle, _| {
1774 repository_handle.get_remotes(branch_name)
1775 })?
1776 .await??;
1777
1778 Ok(proto::GetRemotesResponse {
1779 remotes: remotes
1780 .into_iter()
1781 .map(|remotes| proto::get_remotes_response::Remote {
1782 name: remotes.name.to_string(),
1783 })
1784 .collect::<Vec<_>>(),
1785 })
1786 }
1787
1788 async fn handle_get_branches(
1789 this: Entity<Self>,
1790 envelope: TypedEnvelope<proto::GitGetBranches>,
1791 mut cx: AsyncApp,
1792 ) -> Result<proto::GitBranchesResponse> {
1793 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1794 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1795
1796 let branches = repository_handle
1797 .update(&mut cx, |repository_handle, _| repository_handle.branches())?
1798 .await??;
1799
1800 Ok(proto::GitBranchesResponse {
1801 branches: branches
1802 .into_iter()
1803 .map(|branch| branch_to_proto(&branch))
1804 .collect::<Vec<_>>(),
1805 })
1806 }
1807 async fn handle_create_branch(
1808 this: Entity<Self>,
1809 envelope: TypedEnvelope<proto::GitCreateBranch>,
1810 mut cx: AsyncApp,
1811 ) -> Result<proto::Ack> {
1812 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1813 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1814 let branch_name = envelope.payload.branch_name;
1815
1816 repository_handle
1817 .update(&mut cx, |repository_handle, _| {
1818 repository_handle.create_branch(branch_name)
1819 })?
1820 .await??;
1821
1822 Ok(proto::Ack {})
1823 }
1824
1825 async fn handle_change_branch(
1826 this: Entity<Self>,
1827 envelope: TypedEnvelope<proto::GitChangeBranch>,
1828 mut cx: AsyncApp,
1829 ) -> Result<proto::Ack> {
1830 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1831 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1832 let branch_name = envelope.payload.branch_name;
1833
1834 repository_handle
1835 .update(&mut cx, |repository_handle, _| {
1836 repository_handle.change_branch(branch_name)
1837 })?
1838 .await??;
1839
1840 Ok(proto::Ack {})
1841 }
1842
1843 async fn handle_show(
1844 this: Entity<Self>,
1845 envelope: TypedEnvelope<proto::GitShow>,
1846 mut cx: AsyncApp,
1847 ) -> Result<proto::GitCommitDetails> {
1848 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1849 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1850
1851 let commit = repository_handle
1852 .update(&mut cx, |repository_handle, _| {
1853 repository_handle.show(envelope.payload.commit)
1854 })?
1855 .await??;
1856 Ok(proto::GitCommitDetails {
1857 sha: commit.sha.into(),
1858 message: commit.message.into(),
1859 commit_timestamp: commit.commit_timestamp,
1860 author_email: commit.author_email.into(),
1861 author_name: commit.author_name.into(),
1862 })
1863 }
1864
1865 async fn handle_load_commit_diff(
1866 this: Entity<Self>,
1867 envelope: TypedEnvelope<proto::LoadCommitDiff>,
1868 mut cx: AsyncApp,
1869 ) -> Result<proto::LoadCommitDiffResponse> {
1870 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1871 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1872
1873 let commit_diff = repository_handle
1874 .update(&mut cx, |repository_handle, _| {
1875 repository_handle.load_commit_diff(envelope.payload.commit)
1876 })?
1877 .await??;
1878 Ok(proto::LoadCommitDiffResponse {
1879 files: commit_diff
1880 .files
1881 .into_iter()
1882 .map(|file| proto::CommitFile {
1883 path: file.path.to_string(),
1884 old_text: file.old_text,
1885 new_text: file.new_text,
1886 })
1887 .collect(),
1888 })
1889 }
1890
1891 async fn handle_reset(
1892 this: Entity<Self>,
1893 envelope: TypedEnvelope<proto::GitReset>,
1894 mut cx: AsyncApp,
1895 ) -> Result<proto::Ack> {
1896 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1897 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1898
1899 let mode = match envelope.payload.mode() {
1900 git_reset::ResetMode::Soft => ResetMode::Soft,
1901 git_reset::ResetMode::Mixed => ResetMode::Mixed,
1902 };
1903
1904 repository_handle
1905 .update(&mut cx, |repository_handle, cx| {
1906 repository_handle.reset(envelope.payload.commit, mode, cx)
1907 })?
1908 .await??;
1909 Ok(proto::Ack {})
1910 }
1911
1912 async fn handle_checkout_files(
1913 this: Entity<Self>,
1914 envelope: TypedEnvelope<proto::GitCheckoutFiles>,
1915 mut cx: AsyncApp,
1916 ) -> Result<proto::Ack> {
1917 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1918 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1919 let paths = envelope
1920 .payload
1921 .paths
1922 .iter()
1923 .map(|s| RepoPath::from_str(s))
1924 .collect();
1925
1926 repository_handle
1927 .update(&mut cx, |repository_handle, cx| {
1928 repository_handle.checkout_files(&envelope.payload.commit, paths, cx)
1929 })?
1930 .await??;
1931 Ok(proto::Ack {})
1932 }
1933
1934 async fn handle_open_commit_message_buffer(
1935 this: Entity<Self>,
1936 envelope: TypedEnvelope<proto::OpenCommitMessageBuffer>,
1937 mut cx: AsyncApp,
1938 ) -> Result<proto::OpenBufferResponse> {
1939 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1940 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1941 let buffer = repository
1942 .update(&mut cx, |repository, cx| {
1943 repository.open_commit_buffer(None, this.read(cx).buffer_store.clone(), cx)
1944 })?
1945 .await?;
1946
1947 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id())?;
1948 this.update(&mut cx, |this, cx| {
1949 this.buffer_store.update(cx, |buffer_store, cx| {
1950 buffer_store
1951 .create_buffer_for_peer(
1952 &buffer,
1953 envelope.original_sender_id.unwrap_or(envelope.sender_id),
1954 cx,
1955 )
1956 .detach_and_log_err(cx);
1957 })
1958 })?;
1959
1960 Ok(proto::OpenBufferResponse {
1961 buffer_id: buffer_id.to_proto(),
1962 })
1963 }
1964
1965 async fn handle_askpass(
1966 this: Entity<Self>,
1967 envelope: TypedEnvelope<proto::AskPassRequest>,
1968 mut cx: AsyncApp,
1969 ) -> Result<proto::AskPassResponse> {
1970 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1971 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1972
1973 let delegates = cx.update(|cx| repository.read(cx).askpass_delegates.clone())?;
1974 let Some(mut askpass) = delegates.lock().remove(&envelope.payload.askpass_id) else {
1975 debug_panic!("no askpass found");
1976 anyhow::bail!("no askpass found");
1977 };
1978
1979 let response = askpass.ask_password(envelope.payload.prompt).await?;
1980
1981 delegates
1982 .lock()
1983 .insert(envelope.payload.askpass_id, askpass);
1984
1985 Ok(proto::AskPassResponse { response })
1986 }
1987
1988 async fn handle_check_for_pushed_commits(
1989 this: Entity<Self>,
1990 envelope: TypedEnvelope<proto::CheckForPushedCommits>,
1991 mut cx: AsyncApp,
1992 ) -> Result<proto::CheckForPushedCommitsResponse> {
1993 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1994 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1995
1996 let branches = repository_handle
1997 .update(&mut cx, |repository_handle, _| {
1998 repository_handle.check_for_pushed_commits()
1999 })?
2000 .await??;
2001 Ok(proto::CheckForPushedCommitsResponse {
2002 pushed_to: branches
2003 .into_iter()
2004 .map(|commit| commit.to_string())
2005 .collect(),
2006 })
2007 }
2008
2009 async fn handle_git_diff(
2010 this: Entity<Self>,
2011 envelope: TypedEnvelope<proto::GitDiff>,
2012 mut cx: AsyncApp,
2013 ) -> Result<proto::GitDiffResponse> {
2014 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2015 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2016 let diff_type = match envelope.payload.diff_type() {
2017 proto::git_diff::DiffType::HeadToIndex => DiffType::HeadToIndex,
2018 proto::git_diff::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
2019 };
2020
2021 let mut diff = repository_handle
2022 .update(&mut cx, |repository_handle, cx| {
2023 repository_handle.diff(diff_type, cx)
2024 })?
2025 .await??;
2026 const ONE_MB: usize = 1_000_000;
2027 if diff.len() > ONE_MB {
2028 diff = diff.chars().take(ONE_MB).collect()
2029 }
2030
2031 Ok(proto::GitDiffResponse { diff })
2032 }
2033
2034 async fn handle_open_unstaged_diff(
2035 this: Entity<Self>,
2036 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2037 mut cx: AsyncApp,
2038 ) -> Result<proto::OpenUnstagedDiffResponse> {
2039 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2040 let diff = this
2041 .update(&mut cx, |this, cx| {
2042 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2043 Some(this.open_unstaged_diff(buffer, cx))
2044 })?
2045 .context("missing buffer")?
2046 .await?;
2047 this.update(&mut cx, |this, _| {
2048 let shared_diffs = this
2049 .shared_diffs
2050 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2051 .or_default();
2052 shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone());
2053 })?;
2054 let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?;
2055 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2056 }
2057
2058 async fn handle_open_uncommitted_diff(
2059 this: Entity<Self>,
2060 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2061 mut cx: AsyncApp,
2062 ) -> Result<proto::OpenUncommittedDiffResponse> {
2063 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2064 let diff = this
2065 .update(&mut cx, |this, cx| {
2066 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2067 Some(this.open_uncommitted_diff(buffer, cx))
2068 })?
2069 .context("missing buffer")?
2070 .await?;
2071 this.update(&mut cx, |this, _| {
2072 let shared_diffs = this
2073 .shared_diffs
2074 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2075 .or_default();
2076 shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone());
2077 })?;
2078 diff.read_with(&cx, |diff, cx| {
2079 use proto::open_uncommitted_diff_response::Mode;
2080
2081 let unstaged_diff = diff.secondary_diff();
2082 let index_snapshot = unstaged_diff.and_then(|diff| {
2083 let diff = diff.read(cx);
2084 diff.base_text_exists().then(|| diff.base_text())
2085 });
2086
2087 let mode;
2088 let staged_text;
2089 let committed_text;
2090 if diff.base_text_exists() {
2091 let committed_snapshot = diff.base_text();
2092 committed_text = Some(committed_snapshot.text());
2093 if let Some(index_text) = index_snapshot {
2094 if index_text.remote_id() == committed_snapshot.remote_id() {
2095 mode = Mode::IndexMatchesHead;
2096 staged_text = None;
2097 } else {
2098 mode = Mode::IndexAndHead;
2099 staged_text = Some(index_text.text());
2100 }
2101 } else {
2102 mode = Mode::IndexAndHead;
2103 staged_text = None;
2104 }
2105 } else {
2106 mode = Mode::IndexAndHead;
2107 committed_text = None;
2108 staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2109 }
2110
2111 proto::OpenUncommittedDiffResponse {
2112 committed_text,
2113 staged_text,
2114 mode: mode.into(),
2115 }
2116 })
2117 }
2118
2119 async fn handle_update_diff_bases(
2120 this: Entity<Self>,
2121 request: TypedEnvelope<proto::UpdateDiffBases>,
2122 mut cx: AsyncApp,
2123 ) -> Result<()> {
2124 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2125 this.update(&mut cx, |this, cx| {
2126 if let Some(diff_state) = this.diffs.get_mut(&buffer_id) {
2127 if let Some(buffer) = this.buffer_store.read(cx).get(buffer_id) {
2128 let buffer = buffer.read(cx).text_snapshot();
2129 diff_state.update(cx, |diff_state, cx| {
2130 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2131 })
2132 }
2133 }
2134 })
2135 }
2136
2137 async fn handle_blame_buffer(
2138 this: Entity<Self>,
2139 envelope: TypedEnvelope<proto::BlameBuffer>,
2140 mut cx: AsyncApp,
2141 ) -> Result<proto::BlameBufferResponse> {
2142 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2143 let version = deserialize_version(&envelope.payload.version);
2144 let buffer = this.read_with(&cx, |this, cx| {
2145 this.buffer_store.read(cx).get_existing(buffer_id)
2146 })??;
2147 buffer
2148 .update(&mut cx, |buffer, _| {
2149 buffer.wait_for_version(version.clone())
2150 })?
2151 .await?;
2152 let blame = this
2153 .update(&mut cx, |this, cx| {
2154 this.blame_buffer(&buffer, Some(version), cx)
2155 })?
2156 .await?;
2157 Ok(serialize_blame_buffer_response(blame))
2158 }
2159
2160 async fn handle_get_permalink_to_line(
2161 this: Entity<Self>,
2162 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2163 mut cx: AsyncApp,
2164 ) -> Result<proto::GetPermalinkToLineResponse> {
2165 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2166 // let version = deserialize_version(&envelope.payload.version);
2167 let selection = {
2168 let proto_selection = envelope
2169 .payload
2170 .selection
2171 .context("no selection to get permalink for defined")?;
2172 proto_selection.start as u32..proto_selection.end as u32
2173 };
2174 let buffer = this.read_with(&cx, |this, cx| {
2175 this.buffer_store.read(cx).get_existing(buffer_id)
2176 })??;
2177 let permalink = this
2178 .update(&mut cx, |this, cx| {
2179 this.get_permalink_to_line(&buffer, selection, cx)
2180 })?
2181 .await?;
2182 Ok(proto::GetPermalinkToLineResponse {
2183 permalink: permalink.to_string(),
2184 })
2185 }
2186
2187 fn repository_for_request(
2188 this: &Entity<Self>,
2189 id: RepositoryId,
2190 cx: &mut AsyncApp,
2191 ) -> Result<Entity<Repository>> {
2192 this.read_with(cx, |this, _| {
2193 this.repositories
2194 .get(&id)
2195 .context("missing repository handle")
2196 .cloned()
2197 })?
2198 }
2199
2200 pub fn repo_snapshots(&self, cx: &App) -> HashMap<RepositoryId, RepositorySnapshot> {
2201 self.repositories
2202 .iter()
2203 .map(|(id, repo)| (*id, repo.read(cx).snapshot.clone()))
2204 .collect()
2205 }
2206}
2207
2208impl BufferGitState {
2209 fn new(_git_store: WeakEntity<GitStore>) -> Self {
2210 Self {
2211 unstaged_diff: Default::default(),
2212 uncommitted_diff: Default::default(),
2213 recalculate_diff_task: Default::default(),
2214 language: Default::default(),
2215 language_registry: Default::default(),
2216 recalculating_tx: postage::watch::channel_with(false).0,
2217 hunk_staging_operation_count: 0,
2218 hunk_staging_operation_count_as_of_write: 0,
2219 head_text: Default::default(),
2220 index_text: Default::default(),
2221 head_changed: Default::default(),
2222 index_changed: Default::default(),
2223 language_changed: Default::default(),
2224 conflict_updated_futures: Default::default(),
2225 conflict_set: Default::default(),
2226 reparse_conflict_markers_task: Default::default(),
2227 }
2228 }
2229
2230 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
2231 self.language = buffer.read(cx).language().cloned();
2232 self.language_changed = true;
2233 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
2234 }
2235
2236 fn reparse_conflict_markers(
2237 &mut self,
2238 buffer: text::BufferSnapshot,
2239 cx: &mut Context<Self>,
2240 ) -> oneshot::Receiver<()> {
2241 let (tx, rx) = oneshot::channel();
2242
2243 let Some(conflict_set) = self
2244 .conflict_set
2245 .as_ref()
2246 .and_then(|conflict_set| conflict_set.upgrade())
2247 else {
2248 return rx;
2249 };
2250
2251 let old_snapshot = conflict_set.read_with(cx, |conflict_set, _| {
2252 if conflict_set.has_conflict {
2253 Some(conflict_set.snapshot())
2254 } else {
2255 None
2256 }
2257 });
2258
2259 if let Some(old_snapshot) = old_snapshot {
2260 self.conflict_updated_futures.push(tx);
2261 self.reparse_conflict_markers_task = Some(cx.spawn(async move |this, cx| {
2262 let (snapshot, changed_range) = cx
2263 .background_spawn(async move {
2264 let new_snapshot = ConflictSet::parse(&buffer);
2265 let changed_range = old_snapshot.compare(&new_snapshot, &buffer);
2266 (new_snapshot, changed_range)
2267 })
2268 .await;
2269 this.update(cx, |this, cx| {
2270 if let Some(conflict_set) = &this.conflict_set {
2271 conflict_set
2272 .update(cx, |conflict_set, cx| {
2273 conflict_set.set_snapshot(snapshot, changed_range, cx);
2274 })
2275 .ok();
2276 }
2277 let futures = std::mem::take(&mut this.conflict_updated_futures);
2278 for tx in futures {
2279 tx.send(()).ok();
2280 }
2281 })
2282 }))
2283 }
2284
2285 rx
2286 }
2287
2288 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
2289 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
2290 }
2291
2292 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
2293 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
2294 }
2295
2296 fn handle_base_texts_updated(
2297 &mut self,
2298 buffer: text::BufferSnapshot,
2299 message: proto::UpdateDiffBases,
2300 cx: &mut Context<Self>,
2301 ) {
2302 use proto::update_diff_bases::Mode;
2303
2304 let Some(mode) = Mode::from_i32(message.mode) else {
2305 return;
2306 };
2307
2308 let diff_bases_change = match mode {
2309 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
2310 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
2311 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
2312 Mode::IndexAndHead => DiffBasesChange::SetEach {
2313 index: message.staged_text,
2314 head: message.committed_text,
2315 },
2316 };
2317
2318 self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
2319 }
2320
2321 pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
2322 if *self.recalculating_tx.borrow() {
2323 let mut rx = self.recalculating_tx.subscribe();
2324 return Some(async move {
2325 loop {
2326 let is_recalculating = rx.recv().await;
2327 if is_recalculating != Some(true) {
2328 break;
2329 }
2330 }
2331 });
2332 } else {
2333 None
2334 }
2335 }
2336
2337 fn diff_bases_changed(
2338 &mut self,
2339 buffer: text::BufferSnapshot,
2340 diff_bases_change: Option<DiffBasesChange>,
2341 cx: &mut Context<Self>,
2342 ) {
2343 match diff_bases_change {
2344 Some(DiffBasesChange::SetIndex(index)) => {
2345 self.index_text = index.map(|mut index| {
2346 text::LineEnding::normalize(&mut index);
2347 Arc::new(index)
2348 });
2349 self.index_changed = true;
2350 }
2351 Some(DiffBasesChange::SetHead(head)) => {
2352 self.head_text = head.map(|mut head| {
2353 text::LineEnding::normalize(&mut head);
2354 Arc::new(head)
2355 });
2356 self.head_changed = true;
2357 }
2358 Some(DiffBasesChange::SetBoth(text)) => {
2359 let text = text.map(|mut text| {
2360 text::LineEnding::normalize(&mut text);
2361 Arc::new(text)
2362 });
2363 self.head_text = text.clone();
2364 self.index_text = text;
2365 self.head_changed = true;
2366 self.index_changed = true;
2367 }
2368 Some(DiffBasesChange::SetEach { index, head }) => {
2369 self.index_text = index.map(|mut index| {
2370 text::LineEnding::normalize(&mut index);
2371 Arc::new(index)
2372 });
2373 self.index_changed = true;
2374 self.head_text = head.map(|mut head| {
2375 text::LineEnding::normalize(&mut head);
2376 Arc::new(head)
2377 });
2378 self.head_changed = true;
2379 }
2380 None => {}
2381 }
2382
2383 self.recalculate_diffs(buffer, cx)
2384 }
2385
2386 fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
2387 *self.recalculating_tx.borrow_mut() = true;
2388
2389 let language = self.language.clone();
2390 let language_registry = self.language_registry.clone();
2391 let unstaged_diff = self.unstaged_diff();
2392 let uncommitted_diff = self.uncommitted_diff();
2393 let head = self.head_text.clone();
2394 let index = self.index_text.clone();
2395 let index_changed = self.index_changed;
2396 let head_changed = self.head_changed;
2397 let language_changed = self.language_changed;
2398 let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_write;
2399 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
2400 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
2401 (None, None) => true,
2402 _ => false,
2403 };
2404 self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
2405 log::debug!(
2406 "start recalculating diffs for buffer {}",
2407 buffer.remote_id()
2408 );
2409
2410 let mut new_unstaged_diff = None;
2411 if let Some(unstaged_diff) = &unstaged_diff {
2412 new_unstaged_diff = Some(
2413 BufferDiff::update_diff(
2414 unstaged_diff.clone(),
2415 buffer.clone(),
2416 index,
2417 index_changed,
2418 language_changed,
2419 language.clone(),
2420 language_registry.clone(),
2421 cx,
2422 )
2423 .await?,
2424 );
2425 }
2426
2427 let mut new_uncommitted_diff = None;
2428 if let Some(uncommitted_diff) = &uncommitted_diff {
2429 new_uncommitted_diff = if index_matches_head {
2430 new_unstaged_diff.clone()
2431 } else {
2432 Some(
2433 BufferDiff::update_diff(
2434 uncommitted_diff.clone(),
2435 buffer.clone(),
2436 head,
2437 head_changed,
2438 language_changed,
2439 language.clone(),
2440 language_registry.clone(),
2441 cx,
2442 )
2443 .await?,
2444 )
2445 }
2446 }
2447
2448 let cancel = this.update(cx, |this, _| {
2449 // This checks whether all pending stage/unstage operations
2450 // have quiesced (i.e. both the corresponding write and the
2451 // read of that write have completed). If not, then we cancel
2452 // this recalculation attempt to avoid invalidating pending
2453 // state too quickly; another recalculation will come along
2454 // later and clear the pending state once the state of the index has settled.
2455 if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
2456 *this.recalculating_tx.borrow_mut() = false;
2457 true
2458 } else {
2459 false
2460 }
2461 })?;
2462 if cancel {
2463 log::debug!(
2464 concat!(
2465 "aborting recalculating diffs for buffer {}",
2466 "due to subsequent hunk operations",
2467 ),
2468 buffer.remote_id()
2469 );
2470 return Ok(());
2471 }
2472
2473 let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
2474 unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
2475 {
2476 unstaged_diff.update(cx, |diff, cx| {
2477 if language_changed {
2478 diff.language_changed(cx);
2479 }
2480 diff.set_snapshot(new_unstaged_diff, &buffer, cx)
2481 })?
2482 } else {
2483 None
2484 };
2485
2486 if let Some((uncommitted_diff, new_uncommitted_diff)) =
2487 uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
2488 {
2489 uncommitted_diff.update(cx, |diff, cx| {
2490 if language_changed {
2491 diff.language_changed(cx);
2492 }
2493 diff.set_snapshot_with_secondary(
2494 new_uncommitted_diff,
2495 &buffer,
2496 unstaged_changed_range,
2497 true,
2498 cx,
2499 );
2500 })?;
2501 }
2502
2503 log::debug!(
2504 "finished recalculating diffs for buffer {}",
2505 buffer.remote_id()
2506 );
2507
2508 if let Some(this) = this.upgrade() {
2509 this.update(cx, |this, _| {
2510 this.index_changed = false;
2511 this.head_changed = false;
2512 this.language_changed = false;
2513 *this.recalculating_tx.borrow_mut() = false;
2514 })?;
2515 }
2516
2517 Ok(())
2518 }));
2519 }
2520}
2521
2522fn make_remote_delegate(
2523 this: Entity<GitStore>,
2524 project_id: u64,
2525 repository_id: RepositoryId,
2526 askpass_id: u64,
2527 cx: &mut AsyncApp,
2528) -> AskPassDelegate {
2529 AskPassDelegate::new(cx, move |prompt, tx, cx| {
2530 this.update(cx, |this, cx| {
2531 let Some((client, _)) = this.downstream_client() else {
2532 return;
2533 };
2534 let response = client.request(proto::AskPassRequest {
2535 project_id,
2536 repository_id: repository_id.to_proto(),
2537 askpass_id,
2538 prompt,
2539 });
2540 cx.spawn(async move |_, _| {
2541 tx.send(response.await?.response).ok();
2542 anyhow::Ok(())
2543 })
2544 .detach_and_log_err(cx);
2545 })
2546 .log_err();
2547 })
2548}
2549
2550impl RepositoryId {
2551 pub fn to_proto(self) -> u64 {
2552 self.0
2553 }
2554
2555 pub fn from_proto(id: u64) -> Self {
2556 RepositoryId(id)
2557 }
2558}
2559
2560impl RepositorySnapshot {
2561 fn empty(id: RepositoryId, work_directory_abs_path: Arc<Path>) -> Self {
2562 Self {
2563 id,
2564 statuses_by_path: Default::default(),
2565 work_directory_abs_path,
2566 branch: None,
2567 head_commit: None,
2568 scan_id: 0,
2569 merge: Default::default(),
2570 }
2571 }
2572
2573 fn initial_update(&self, project_id: u64) -> proto::UpdateRepository {
2574 proto::UpdateRepository {
2575 branch_summary: self.branch.as_ref().map(branch_to_proto),
2576 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2577 updated_statuses: self
2578 .statuses_by_path
2579 .iter()
2580 .map(|entry| entry.to_proto())
2581 .collect(),
2582 removed_statuses: Default::default(),
2583 current_merge_conflicts: self
2584 .merge
2585 .conflicted_paths
2586 .iter()
2587 .map(|repo_path| repo_path.to_proto())
2588 .collect(),
2589 project_id,
2590 id: self.id.to_proto(),
2591 abs_path: self.work_directory_abs_path.to_proto(),
2592 entry_ids: vec![self.id.to_proto()],
2593 scan_id: self.scan_id,
2594 is_last_update: true,
2595 }
2596 }
2597
2598 fn build_update(&self, old: &Self, project_id: u64) -> proto::UpdateRepository {
2599 let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
2600 let mut removed_statuses: Vec<String> = Vec::new();
2601
2602 let mut new_statuses = self.statuses_by_path.iter().peekable();
2603 let mut old_statuses = old.statuses_by_path.iter().peekable();
2604
2605 let mut current_new_entry = new_statuses.next();
2606 let mut current_old_entry = old_statuses.next();
2607 loop {
2608 match (current_new_entry, current_old_entry) {
2609 (Some(new_entry), Some(old_entry)) => {
2610 match new_entry.repo_path.cmp(&old_entry.repo_path) {
2611 Ordering::Less => {
2612 updated_statuses.push(new_entry.to_proto());
2613 current_new_entry = new_statuses.next();
2614 }
2615 Ordering::Equal => {
2616 if new_entry.status != old_entry.status {
2617 updated_statuses.push(new_entry.to_proto());
2618 }
2619 current_old_entry = old_statuses.next();
2620 current_new_entry = new_statuses.next();
2621 }
2622 Ordering::Greater => {
2623 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2624 current_old_entry = old_statuses.next();
2625 }
2626 }
2627 }
2628 (None, Some(old_entry)) => {
2629 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2630 current_old_entry = old_statuses.next();
2631 }
2632 (Some(new_entry), None) => {
2633 updated_statuses.push(new_entry.to_proto());
2634 current_new_entry = new_statuses.next();
2635 }
2636 (None, None) => break,
2637 }
2638 }
2639
2640 proto::UpdateRepository {
2641 branch_summary: self.branch.as_ref().map(branch_to_proto),
2642 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2643 updated_statuses,
2644 removed_statuses,
2645 current_merge_conflicts: self
2646 .merge
2647 .conflicted_paths
2648 .iter()
2649 .map(|path| path.as_ref().to_proto())
2650 .collect(),
2651 project_id,
2652 id: self.id.to_proto(),
2653 abs_path: self.work_directory_abs_path.to_proto(),
2654 entry_ids: vec![],
2655 scan_id: self.scan_id,
2656 is_last_update: true,
2657 }
2658 }
2659
2660 pub fn status(&self) -> impl Iterator<Item = StatusEntry> + '_ {
2661 self.statuses_by_path.iter().cloned()
2662 }
2663
2664 pub fn status_summary(&self) -> GitSummary {
2665 self.statuses_by_path.summary().item_summary
2666 }
2667
2668 pub fn status_for_path(&self, path: &RepoPath) -> Option<StatusEntry> {
2669 self.statuses_by_path
2670 .get(&PathKey(path.0.clone()), &())
2671 .cloned()
2672 }
2673
2674 pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
2675 abs_path
2676 .strip_prefix(&self.work_directory_abs_path)
2677 .map(RepoPath::from)
2678 .ok()
2679 }
2680
2681 pub fn had_conflict_on_last_merge_head_change(&self, repo_path: &RepoPath) -> bool {
2682 self.merge.conflicted_paths.contains(&repo_path)
2683 }
2684
2685 pub fn has_conflict(&self, repo_path: &RepoPath) -> bool {
2686 let had_conflict_on_last_merge_head_change =
2687 self.merge.conflicted_paths.contains(&repo_path);
2688 let has_conflict_currently = self
2689 .status_for_path(&repo_path)
2690 .map_or(false, |entry| entry.status.is_conflicted());
2691 had_conflict_on_last_merge_head_change || has_conflict_currently
2692 }
2693
2694 /// This is the name that will be displayed in the repository selector for this repository.
2695 pub fn display_name(&self) -> SharedString {
2696 self.work_directory_abs_path
2697 .file_name()
2698 .unwrap_or_default()
2699 .to_string_lossy()
2700 .to_string()
2701 .into()
2702 }
2703}
2704
2705impl MergeDetails {
2706 async fn load(
2707 backend: &Arc<dyn GitRepository>,
2708 status: &SumTree<StatusEntry>,
2709 prev_snapshot: &RepositorySnapshot,
2710 ) -> Result<(MergeDetails, bool)> {
2711 log::debug!("load merge details");
2712 let message = backend.merge_message().await;
2713 let heads = backend
2714 .revparse_batch(vec![
2715 "MERGE_HEAD".into(),
2716 "CHERRY_PICK_HEAD".into(),
2717 "REBASE_HEAD".into(),
2718 "REVERT_HEAD".into(),
2719 "APPLY_HEAD".into(),
2720 ])
2721 .await
2722 .log_err()
2723 .unwrap_or_default()
2724 .into_iter()
2725 .map(|opt| opt.map(SharedString::from))
2726 .collect::<Vec<_>>();
2727 let merge_heads_changed = heads != prev_snapshot.merge.heads;
2728 let conflicted_paths = if merge_heads_changed {
2729 let current_conflicted_paths = TreeSet::from_ordered_entries(
2730 status
2731 .iter()
2732 .filter(|entry| entry.status.is_conflicted())
2733 .map(|entry| entry.repo_path.clone()),
2734 );
2735
2736 // It can happen that we run a scan while a lengthy merge is in progress
2737 // that will eventually result in conflicts, but before those conflicts
2738 // are reported by `git status`. Since for the moment we only care about
2739 // the merge heads state for the purposes of tracking conflicts, don't update
2740 // this state until we see some conflicts.
2741 if heads.iter().any(Option::is_some)
2742 && !prev_snapshot.merge.heads.iter().any(Option::is_some)
2743 && current_conflicted_paths.is_empty()
2744 {
2745 log::debug!("not updating merge heads because no conflicts found");
2746 return Ok((
2747 MergeDetails {
2748 message: message.map(SharedString::from),
2749 ..prev_snapshot.merge.clone()
2750 },
2751 false,
2752 ));
2753 }
2754
2755 current_conflicted_paths
2756 } else {
2757 prev_snapshot.merge.conflicted_paths.clone()
2758 };
2759 let details = MergeDetails {
2760 conflicted_paths,
2761 message: message.map(SharedString::from),
2762 heads,
2763 };
2764 Ok((details, merge_heads_changed))
2765 }
2766}
2767
2768impl Repository {
2769 pub fn snapshot(&self) -> RepositorySnapshot {
2770 self.snapshot.clone()
2771 }
2772
2773 fn local(
2774 id: RepositoryId,
2775 work_directory_abs_path: Arc<Path>,
2776 dot_git_abs_path: Arc<Path>,
2777 repository_dir_abs_path: Arc<Path>,
2778 common_dir_abs_path: Arc<Path>,
2779 project_environment: WeakEntity<ProjectEnvironment>,
2780 fs: Arc<dyn Fs>,
2781 git_store: WeakEntity<GitStore>,
2782 cx: &mut Context<Self>,
2783 ) -> Self {
2784 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path.clone());
2785 Repository {
2786 this: cx.weak_entity(),
2787 git_store,
2788 snapshot,
2789 commit_message_buffer: None,
2790 askpass_delegates: Default::default(),
2791 paths_needing_status_update: Default::default(),
2792 latest_askpass_id: 0,
2793 job_sender: Repository::spawn_local_git_worker(
2794 work_directory_abs_path,
2795 dot_git_abs_path,
2796 repository_dir_abs_path,
2797 common_dir_abs_path,
2798 project_environment,
2799 fs,
2800 cx,
2801 ),
2802 job_id: 0,
2803 active_jobs: Default::default(),
2804 }
2805 }
2806
2807 fn remote(
2808 id: RepositoryId,
2809 work_directory_abs_path: Arc<Path>,
2810 project_id: ProjectId,
2811 client: AnyProtoClient,
2812 git_store: WeakEntity<GitStore>,
2813 cx: &mut Context<Self>,
2814 ) -> Self {
2815 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path);
2816 Self {
2817 this: cx.weak_entity(),
2818 snapshot,
2819 commit_message_buffer: None,
2820 git_store,
2821 paths_needing_status_update: Default::default(),
2822 job_sender: Self::spawn_remote_git_worker(project_id, client, cx),
2823 askpass_delegates: Default::default(),
2824 latest_askpass_id: 0,
2825 active_jobs: Default::default(),
2826 job_id: 0,
2827 }
2828 }
2829
2830 pub fn git_store(&self) -> Option<Entity<GitStore>> {
2831 self.git_store.upgrade()
2832 }
2833
2834 fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
2835 let this = cx.weak_entity();
2836 let git_store = self.git_store.clone();
2837 let _ = self.send_keyed_job(
2838 Some(GitJobKey::ReloadBufferDiffBases),
2839 None,
2840 |state, mut cx| async move {
2841 let RepositoryState::Local { backend, .. } = state else {
2842 log::error!("tried to recompute diffs for a non-local repository");
2843 return Ok(());
2844 };
2845
2846 let Some(this) = this.upgrade() else {
2847 return Ok(());
2848 };
2849
2850 let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
2851 git_store.update(cx, |git_store, cx| {
2852 git_store
2853 .diffs
2854 .iter()
2855 .filter_map(|(buffer_id, diff_state)| {
2856 let buffer_store = git_store.buffer_store.read(cx);
2857 let buffer = buffer_store.get(*buffer_id)?;
2858 let file = File::from_dyn(buffer.read(cx).file())?;
2859 let abs_path =
2860 file.worktree.read(cx).absolutize(&file.path).ok()?;
2861 let repo_path = this.abs_path_to_repo_path(&abs_path)?;
2862 log::debug!(
2863 "start reload diff bases for repo path {}",
2864 repo_path.0.display()
2865 );
2866 diff_state.update(cx, |diff_state, _| {
2867 let has_unstaged_diff = diff_state
2868 .unstaged_diff
2869 .as_ref()
2870 .is_some_and(|diff| diff.is_upgradable());
2871 let has_uncommitted_diff = diff_state
2872 .uncommitted_diff
2873 .as_ref()
2874 .is_some_and(|set| set.is_upgradable());
2875
2876 Some((
2877 buffer,
2878 repo_path,
2879 has_unstaged_diff.then(|| diff_state.index_text.clone()),
2880 has_uncommitted_diff.then(|| diff_state.head_text.clone()),
2881 ))
2882 })
2883 })
2884 .collect::<Vec<_>>()
2885 })
2886 })??;
2887
2888 let buffer_diff_base_changes = cx
2889 .background_spawn(async move {
2890 let mut changes = Vec::new();
2891 for (buffer, repo_path, current_index_text, current_head_text) in
2892 &repo_diff_state_updates
2893 {
2894 let index_text = if current_index_text.is_some() {
2895 backend.load_index_text(repo_path.clone()).await
2896 } else {
2897 None
2898 };
2899 let head_text = if current_head_text.is_some() {
2900 backend.load_committed_text(repo_path.clone()).await
2901 } else {
2902 None
2903 };
2904
2905 let change =
2906 match (current_index_text.as_ref(), current_head_text.as_ref()) {
2907 (Some(current_index), Some(current_head)) => {
2908 let index_changed =
2909 index_text.as_ref() != current_index.as_deref();
2910 let head_changed =
2911 head_text.as_ref() != current_head.as_deref();
2912 if index_changed && head_changed {
2913 if index_text == head_text {
2914 Some(DiffBasesChange::SetBoth(head_text))
2915 } else {
2916 Some(DiffBasesChange::SetEach {
2917 index: index_text,
2918 head: head_text,
2919 })
2920 }
2921 } else if index_changed {
2922 Some(DiffBasesChange::SetIndex(index_text))
2923 } else if head_changed {
2924 Some(DiffBasesChange::SetHead(head_text))
2925 } else {
2926 None
2927 }
2928 }
2929 (Some(current_index), None) => {
2930 let index_changed =
2931 index_text.as_ref() != current_index.as_deref();
2932 index_changed
2933 .then_some(DiffBasesChange::SetIndex(index_text))
2934 }
2935 (None, Some(current_head)) => {
2936 let head_changed =
2937 head_text.as_ref() != current_head.as_deref();
2938 head_changed.then_some(DiffBasesChange::SetHead(head_text))
2939 }
2940 (None, None) => None,
2941 };
2942
2943 changes.push((buffer.clone(), change))
2944 }
2945 changes
2946 })
2947 .await;
2948
2949 git_store.update(&mut cx, |git_store, cx| {
2950 for (buffer, diff_bases_change) in buffer_diff_base_changes {
2951 let buffer_snapshot = buffer.read(cx).text_snapshot();
2952 let buffer_id = buffer_snapshot.remote_id();
2953 let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
2954 continue;
2955 };
2956
2957 let downstream_client = git_store.downstream_client();
2958 diff_state.update(cx, |diff_state, cx| {
2959 use proto::update_diff_bases::Mode;
2960
2961 if let Some((diff_bases_change, (client, project_id))) =
2962 diff_bases_change.clone().zip(downstream_client)
2963 {
2964 let (staged_text, committed_text, mode) = match diff_bases_change {
2965 DiffBasesChange::SetIndex(index) => {
2966 (index, None, Mode::IndexOnly)
2967 }
2968 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
2969 DiffBasesChange::SetEach { index, head } => {
2970 (index, head, Mode::IndexAndHead)
2971 }
2972 DiffBasesChange::SetBoth(text) => {
2973 (None, text, Mode::IndexMatchesHead)
2974 }
2975 };
2976 client
2977 .send(proto::UpdateDiffBases {
2978 project_id: project_id.to_proto(),
2979 buffer_id: buffer_id.to_proto(),
2980 staged_text,
2981 committed_text,
2982 mode: mode as i32,
2983 })
2984 .log_err();
2985 }
2986
2987 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
2988 });
2989 }
2990 })
2991 },
2992 );
2993 }
2994
2995 pub fn send_job<F, Fut, R>(
2996 &mut self,
2997 status: Option<SharedString>,
2998 job: F,
2999 ) -> oneshot::Receiver<R>
3000 where
3001 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3002 Fut: Future<Output = R> + 'static,
3003 R: Send + 'static,
3004 {
3005 self.send_keyed_job(None, status, job)
3006 }
3007
3008 fn send_keyed_job<F, Fut, R>(
3009 &mut self,
3010 key: Option<GitJobKey>,
3011 status: Option<SharedString>,
3012 job: F,
3013 ) -> oneshot::Receiver<R>
3014 where
3015 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3016 Fut: Future<Output = R> + 'static,
3017 R: Send + 'static,
3018 {
3019 let (result_tx, result_rx) = futures::channel::oneshot::channel();
3020 let job_id = post_inc(&mut self.job_id);
3021 let this = self.this.clone();
3022 self.job_sender
3023 .unbounded_send(GitJob {
3024 key,
3025 job: Box::new(move |state, cx: &mut AsyncApp| {
3026 let job = job(state, cx.clone());
3027 cx.spawn(async move |cx| {
3028 if let Some(s) = status.clone() {
3029 this.update(cx, |this, cx| {
3030 this.active_jobs.insert(
3031 job_id,
3032 JobInfo {
3033 start: Instant::now(),
3034 message: s.clone(),
3035 },
3036 );
3037
3038 cx.notify();
3039 })
3040 .ok();
3041 }
3042 let result = job.await;
3043
3044 this.update(cx, |this, cx| {
3045 this.active_jobs.remove(&job_id);
3046 cx.notify();
3047 })
3048 .ok();
3049
3050 result_tx.send(result).ok();
3051 })
3052 }),
3053 })
3054 .ok();
3055 result_rx
3056 }
3057
3058 pub fn set_as_active_repository(&self, cx: &mut Context<Self>) {
3059 let Some(git_store) = self.git_store.upgrade() else {
3060 return;
3061 };
3062 let entity = cx.entity();
3063 git_store.update(cx, |git_store, cx| {
3064 let Some((&id, _)) = git_store
3065 .repositories
3066 .iter()
3067 .find(|(_, handle)| *handle == &entity)
3068 else {
3069 return;
3070 };
3071 git_store.active_repo_id = Some(id);
3072 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
3073 });
3074 }
3075
3076 pub fn cached_status(&self) -> impl '_ + Iterator<Item = StatusEntry> {
3077 self.snapshot.status()
3078 }
3079
3080 pub fn repo_path_to_project_path(&self, path: &RepoPath, cx: &App) -> Option<ProjectPath> {
3081 let git_store = self.git_store.upgrade()?;
3082 let worktree_store = git_store.read(cx).worktree_store.read(cx);
3083 let abs_path = self.snapshot.work_directory_abs_path.join(&path.0);
3084 let (worktree, relative_path) = worktree_store.find_worktree(abs_path, cx)?;
3085 Some(ProjectPath {
3086 worktree_id: worktree.read(cx).id(),
3087 path: relative_path.into(),
3088 })
3089 }
3090
3091 pub fn project_path_to_repo_path(&self, path: &ProjectPath, cx: &App) -> Option<RepoPath> {
3092 let git_store = self.git_store.upgrade()?;
3093 let worktree_store = git_store.read(cx).worktree_store.read(cx);
3094 let abs_path = worktree_store.absolutize(path, cx)?;
3095 self.snapshot.abs_path_to_repo_path(&abs_path)
3096 }
3097
3098 pub fn contains_sub_repo(&self, other: &Entity<Self>, cx: &App) -> bool {
3099 other
3100 .read(cx)
3101 .snapshot
3102 .work_directory_abs_path
3103 .starts_with(&self.snapshot.work_directory_abs_path)
3104 }
3105
3106 pub fn open_commit_buffer(
3107 &mut self,
3108 languages: Option<Arc<LanguageRegistry>>,
3109 buffer_store: Entity<BufferStore>,
3110 cx: &mut Context<Self>,
3111 ) -> Task<Result<Entity<Buffer>>> {
3112 let id = self.id;
3113 if let Some(buffer) = self.commit_message_buffer.clone() {
3114 return Task::ready(Ok(buffer));
3115 }
3116 let this = cx.weak_entity();
3117
3118 let rx = self.send_job(None, move |state, mut cx| async move {
3119 let Some(this) = this.upgrade() else {
3120 bail!("git store was dropped");
3121 };
3122 match state {
3123 RepositoryState::Local { .. } => {
3124 this.update(&mut cx, |_, cx| {
3125 Self::open_local_commit_buffer(languages, buffer_store, cx)
3126 })?
3127 .await
3128 }
3129 RepositoryState::Remote { project_id, client } => {
3130 let request = client.request(proto::OpenCommitMessageBuffer {
3131 project_id: project_id.0,
3132 repository_id: id.to_proto(),
3133 });
3134 let response = request.await.context("requesting to open commit buffer")?;
3135 let buffer_id = BufferId::new(response.buffer_id)?;
3136 let buffer = buffer_store
3137 .update(&mut cx, |buffer_store, cx| {
3138 buffer_store.wait_for_remote_buffer(buffer_id, cx)
3139 })?
3140 .await?;
3141 if let Some(language_registry) = languages {
3142 let git_commit_language =
3143 language_registry.language_for_name("Git Commit").await?;
3144 buffer.update(&mut cx, |buffer, cx| {
3145 buffer.set_language(Some(git_commit_language), cx);
3146 })?;
3147 }
3148 this.update(&mut cx, |this, _| {
3149 this.commit_message_buffer = Some(buffer.clone());
3150 })?;
3151 Ok(buffer)
3152 }
3153 }
3154 });
3155
3156 cx.spawn(|_, _: &mut AsyncApp| async move { rx.await? })
3157 }
3158
3159 fn open_local_commit_buffer(
3160 language_registry: Option<Arc<LanguageRegistry>>,
3161 buffer_store: Entity<BufferStore>,
3162 cx: &mut Context<Self>,
3163 ) -> Task<Result<Entity<Buffer>>> {
3164 cx.spawn(async move |repository, cx| {
3165 let buffer = buffer_store
3166 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx))?
3167 .await?;
3168
3169 if let Some(language_registry) = language_registry {
3170 let git_commit_language = language_registry.language_for_name("Git Commit").await?;
3171 buffer.update(cx, |buffer, cx| {
3172 buffer.set_language(Some(git_commit_language), cx);
3173 })?;
3174 }
3175
3176 repository.update(cx, |repository, _| {
3177 repository.commit_message_buffer = Some(buffer.clone());
3178 })?;
3179 Ok(buffer)
3180 })
3181 }
3182
3183 pub fn checkout_files(
3184 &mut self,
3185 commit: &str,
3186 paths: Vec<RepoPath>,
3187 _cx: &mut App,
3188 ) -> oneshot::Receiver<Result<()>> {
3189 let commit = commit.to_string();
3190 let id = self.id;
3191
3192 self.send_job(
3193 Some(format!("git checkout {}", commit).into()),
3194 move |git_repo, _| async move {
3195 match git_repo {
3196 RepositoryState::Local {
3197 backend,
3198 environment,
3199 ..
3200 } => {
3201 backend
3202 .checkout_files(commit, paths, environment.clone())
3203 .await
3204 }
3205 RepositoryState::Remote { project_id, client } => {
3206 client
3207 .request(proto::GitCheckoutFiles {
3208 project_id: project_id.0,
3209 repository_id: id.to_proto(),
3210 commit,
3211 paths: paths
3212 .into_iter()
3213 .map(|p| p.to_string_lossy().to_string())
3214 .collect(),
3215 })
3216 .await?;
3217
3218 Ok(())
3219 }
3220 }
3221 },
3222 )
3223 }
3224
3225 pub fn reset(
3226 &mut self,
3227 commit: String,
3228 reset_mode: ResetMode,
3229 _cx: &mut App,
3230 ) -> oneshot::Receiver<Result<()>> {
3231 let commit = commit.to_string();
3232 let id = self.id;
3233
3234 self.send_job(None, move |git_repo, _| async move {
3235 match git_repo {
3236 RepositoryState::Local {
3237 backend,
3238 environment,
3239 ..
3240 } => backend.reset(commit, reset_mode, environment).await,
3241 RepositoryState::Remote { project_id, client } => {
3242 client
3243 .request(proto::GitReset {
3244 project_id: project_id.0,
3245 repository_id: id.to_proto(),
3246 commit,
3247 mode: match reset_mode {
3248 ResetMode::Soft => git_reset::ResetMode::Soft.into(),
3249 ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
3250 },
3251 })
3252 .await?;
3253
3254 Ok(())
3255 }
3256 }
3257 })
3258 }
3259
3260 pub fn show(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDetails>> {
3261 let id = self.id;
3262 self.send_job(None, move |git_repo, _cx| async move {
3263 match git_repo {
3264 RepositoryState::Local { backend, .. } => backend.show(commit).await,
3265 RepositoryState::Remote { project_id, client } => {
3266 let resp = client
3267 .request(proto::GitShow {
3268 project_id: project_id.0,
3269 repository_id: id.to_proto(),
3270 commit,
3271 })
3272 .await?;
3273
3274 Ok(CommitDetails {
3275 sha: resp.sha.into(),
3276 message: resp.message.into(),
3277 commit_timestamp: resp.commit_timestamp,
3278 author_email: resp.author_email.into(),
3279 author_name: resp.author_name.into(),
3280 })
3281 }
3282 }
3283 })
3284 }
3285
3286 pub fn load_commit_diff(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
3287 let id = self.id;
3288 self.send_job(None, move |git_repo, cx| async move {
3289 match git_repo {
3290 RepositoryState::Local { backend, .. } => backend.load_commit(commit, cx).await,
3291 RepositoryState::Remote {
3292 client, project_id, ..
3293 } => {
3294 let response = client
3295 .request(proto::LoadCommitDiff {
3296 project_id: project_id.0,
3297 repository_id: id.to_proto(),
3298 commit,
3299 })
3300 .await?;
3301 Ok(CommitDiff {
3302 files: response
3303 .files
3304 .into_iter()
3305 .map(|file| CommitFile {
3306 path: Path::new(&file.path).into(),
3307 old_text: file.old_text,
3308 new_text: file.new_text,
3309 })
3310 .collect(),
3311 })
3312 }
3313 }
3314 })
3315 }
3316
3317 fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
3318 Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
3319 }
3320
3321 pub fn stage_entries(
3322 &self,
3323 entries: Vec<RepoPath>,
3324 cx: &mut Context<Self>,
3325 ) -> Task<anyhow::Result<()>> {
3326 if entries.is_empty() {
3327 return Task::ready(Ok(()));
3328 }
3329 let id = self.id;
3330
3331 let mut save_futures = Vec::new();
3332 if let Some(buffer_store) = self.buffer_store(cx) {
3333 buffer_store.update(cx, |buffer_store, cx| {
3334 for path in &entries {
3335 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3336 continue;
3337 };
3338 if let Some(buffer) = buffer_store.get_by_path(&project_path) {
3339 if buffer
3340 .read(cx)
3341 .file()
3342 .map_or(false, |file| file.disk_state().exists())
3343 {
3344 save_futures.push(buffer_store.save_buffer(buffer, cx));
3345 }
3346 }
3347 }
3348 })
3349 }
3350
3351 cx.spawn(async move |this, cx| {
3352 for save_future in save_futures {
3353 save_future.await?;
3354 }
3355
3356 this.update(cx, |this, _| {
3357 this.send_job(None, move |git_repo, _cx| async move {
3358 match git_repo {
3359 RepositoryState::Local {
3360 backend,
3361 environment,
3362 ..
3363 } => backend.stage_paths(entries, environment.clone()).await,
3364 RepositoryState::Remote { project_id, client } => {
3365 client
3366 .request(proto::Stage {
3367 project_id: project_id.0,
3368 repository_id: id.to_proto(),
3369 paths: entries
3370 .into_iter()
3371 .map(|repo_path| repo_path.as_ref().to_proto())
3372 .collect(),
3373 })
3374 .await
3375 .context("sending stage request")?;
3376
3377 Ok(())
3378 }
3379 }
3380 })
3381 })?
3382 .await??;
3383
3384 Ok(())
3385 })
3386 }
3387
3388 pub fn unstage_entries(
3389 &self,
3390 entries: Vec<RepoPath>,
3391 cx: &mut Context<Self>,
3392 ) -> Task<anyhow::Result<()>> {
3393 if entries.is_empty() {
3394 return Task::ready(Ok(()));
3395 }
3396 let id = self.id;
3397
3398 let mut save_futures = Vec::new();
3399 if let Some(buffer_store) = self.buffer_store(cx) {
3400 buffer_store.update(cx, |buffer_store, cx| {
3401 for path in &entries {
3402 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3403 continue;
3404 };
3405 if let Some(buffer) = buffer_store.get_by_path(&project_path) {
3406 if buffer
3407 .read(cx)
3408 .file()
3409 .map_or(false, |file| file.disk_state().exists())
3410 {
3411 save_futures.push(buffer_store.save_buffer(buffer, cx));
3412 }
3413 }
3414 }
3415 })
3416 }
3417
3418 cx.spawn(async move |this, cx| {
3419 for save_future in save_futures {
3420 save_future.await?;
3421 }
3422
3423 this.update(cx, |this, _| {
3424 this.send_job(None, move |git_repo, _cx| async move {
3425 match git_repo {
3426 RepositoryState::Local {
3427 backend,
3428 environment,
3429 ..
3430 } => backend.unstage_paths(entries, environment).await,
3431 RepositoryState::Remote { project_id, client } => {
3432 client
3433 .request(proto::Unstage {
3434 project_id: project_id.0,
3435 repository_id: id.to_proto(),
3436 paths: entries
3437 .into_iter()
3438 .map(|repo_path| repo_path.as_ref().to_proto())
3439 .collect(),
3440 })
3441 .await
3442 .context("sending unstage request")?;
3443
3444 Ok(())
3445 }
3446 }
3447 })
3448 })?
3449 .await??;
3450
3451 Ok(())
3452 })
3453 }
3454
3455 pub fn stage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3456 let to_stage = self
3457 .cached_status()
3458 .filter(|entry| !entry.status.staging().is_fully_staged())
3459 .map(|entry| entry.repo_path.clone())
3460 .collect();
3461 self.stage_entries(to_stage, cx)
3462 }
3463
3464 pub fn unstage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3465 let to_unstage = self
3466 .cached_status()
3467 .filter(|entry| entry.status.staging().has_staged())
3468 .map(|entry| entry.repo_path.clone())
3469 .collect();
3470 self.unstage_entries(to_unstage, cx)
3471 }
3472
3473 pub fn commit(
3474 &mut self,
3475 message: SharedString,
3476 name_and_email: Option<(SharedString, SharedString)>,
3477 options: CommitOptions,
3478 askpass: AskPassDelegate,
3479 _cx: &mut App,
3480 ) -> oneshot::Receiver<Result<()>> {
3481 let id = self.id;
3482 let askpass_delegates = self.askpass_delegates.clone();
3483 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3484
3485 self.send_job(Some("git commit".into()), move |git_repo, cx| async move {
3486 match git_repo {
3487 RepositoryState::Local {
3488 backend,
3489 environment,
3490 ..
3491 } => {
3492 backend
3493 .commit(message, name_and_email, options, askpass, environment, cx)
3494 .await
3495 }
3496 RepositoryState::Remote { project_id, client } => {
3497 askpass_delegates.lock().insert(askpass_id, askpass);
3498 let _defer = util::defer(|| {
3499 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3500 debug_assert!(askpass_delegate.is_some());
3501 });
3502
3503 let (name, email) = name_and_email.unzip();
3504 client
3505 .request(proto::Commit {
3506 project_id: project_id.0,
3507 repository_id: id.to_proto(),
3508 message: String::from(message),
3509 name: name.map(String::from),
3510 email: email.map(String::from),
3511 options: Some(proto::commit::CommitOptions {
3512 amend: options.amend,
3513 }),
3514 askpass_id: Some(askpass_id),
3515 })
3516 .await?;
3517
3518 Ok(())
3519 }
3520 }
3521 })
3522 }
3523
3524 pub fn fetch(
3525 &mut self,
3526 fetch_options: FetchOptions,
3527 askpass: AskPassDelegate,
3528 _cx: &mut App,
3529 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3530 let askpass_delegates = self.askpass_delegates.clone();
3531 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3532 let id = self.id;
3533
3534 self.send_job(Some("git fetch".into()), move |git_repo, cx| async move {
3535 match git_repo {
3536 RepositoryState::Local {
3537 backend,
3538 environment,
3539 ..
3540 } => backend.fetch(fetch_options, askpass, environment, cx).await,
3541 RepositoryState::Remote { project_id, client } => {
3542 askpass_delegates.lock().insert(askpass_id, askpass);
3543 let _defer = util::defer(|| {
3544 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3545 debug_assert!(askpass_delegate.is_some());
3546 });
3547
3548 let response = client
3549 .request(proto::Fetch {
3550 project_id: project_id.0,
3551 repository_id: id.to_proto(),
3552 askpass_id,
3553 remote: fetch_options.to_proto(),
3554 })
3555 .await
3556 .context("sending fetch request")?;
3557
3558 Ok(RemoteCommandOutput {
3559 stdout: response.stdout,
3560 stderr: response.stderr,
3561 })
3562 }
3563 }
3564 })
3565 }
3566
3567 pub fn push(
3568 &mut self,
3569 branch: SharedString,
3570 remote: SharedString,
3571 options: Option<PushOptions>,
3572 askpass: AskPassDelegate,
3573 cx: &mut Context<Self>,
3574 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3575 let askpass_delegates = self.askpass_delegates.clone();
3576 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3577 let id = self.id;
3578
3579 let args = options
3580 .map(|option| match option {
3581 PushOptions::SetUpstream => " --set-upstream",
3582 PushOptions::Force => " --force-with-lease",
3583 })
3584 .unwrap_or("");
3585
3586 let updates_tx = self
3587 .git_store()
3588 .and_then(|git_store| match &git_store.read(cx).state {
3589 GitStoreState::Local { downstream, .. } => downstream
3590 .as_ref()
3591 .map(|downstream| downstream.updates_tx.clone()),
3592 _ => None,
3593 });
3594
3595 let this = cx.weak_entity();
3596 self.send_job(
3597 Some(format!("git push {} {} {}", args, branch, remote).into()),
3598 move |git_repo, mut cx| async move {
3599 match git_repo {
3600 RepositoryState::Local {
3601 backend,
3602 environment,
3603 ..
3604 } => {
3605 let result = backend
3606 .push(
3607 branch.to_string(),
3608 remote.to_string(),
3609 options,
3610 askpass,
3611 environment.clone(),
3612 cx.clone(),
3613 )
3614 .await;
3615 if result.is_ok() {
3616 let branches = backend.branches().await?;
3617 let branch = branches.into_iter().find(|branch| branch.is_head);
3618 log::info!("head branch after scan is {branch:?}");
3619 let snapshot = this.update(&mut cx, |this, cx| {
3620 this.snapshot.branch = branch;
3621 let snapshot = this.snapshot.clone();
3622 cx.emit(RepositoryEvent::Updated {
3623 full_scan: false,
3624 new_instance: false,
3625 });
3626 snapshot
3627 })?;
3628 if let Some(updates_tx) = updates_tx {
3629 updates_tx
3630 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
3631 .ok();
3632 }
3633 }
3634 result
3635 }
3636 RepositoryState::Remote { project_id, client } => {
3637 askpass_delegates.lock().insert(askpass_id, askpass);
3638 let _defer = util::defer(|| {
3639 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3640 debug_assert!(askpass_delegate.is_some());
3641 });
3642 let response = client
3643 .request(proto::Push {
3644 project_id: project_id.0,
3645 repository_id: id.to_proto(),
3646 askpass_id,
3647 branch_name: branch.to_string(),
3648 remote_name: remote.to_string(),
3649 options: options.map(|options| match options {
3650 PushOptions::Force => proto::push::PushOptions::Force,
3651 PushOptions::SetUpstream => {
3652 proto::push::PushOptions::SetUpstream
3653 }
3654 }
3655 as i32),
3656 })
3657 .await
3658 .context("sending push request")?;
3659
3660 Ok(RemoteCommandOutput {
3661 stdout: response.stdout,
3662 stderr: response.stderr,
3663 })
3664 }
3665 }
3666 },
3667 )
3668 }
3669
3670 pub fn pull(
3671 &mut self,
3672 branch: SharedString,
3673 remote: SharedString,
3674 askpass: AskPassDelegate,
3675 _cx: &mut App,
3676 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3677 let askpass_delegates = self.askpass_delegates.clone();
3678 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3679 let id = self.id;
3680
3681 self.send_job(
3682 Some(format!("git pull {} {}", remote, branch).into()),
3683 move |git_repo, cx| async move {
3684 match git_repo {
3685 RepositoryState::Local {
3686 backend,
3687 environment,
3688 ..
3689 } => {
3690 backend
3691 .pull(
3692 branch.to_string(),
3693 remote.to_string(),
3694 askpass,
3695 environment.clone(),
3696 cx,
3697 )
3698 .await
3699 }
3700 RepositoryState::Remote { project_id, client } => {
3701 askpass_delegates.lock().insert(askpass_id, askpass);
3702 let _defer = util::defer(|| {
3703 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3704 debug_assert!(askpass_delegate.is_some());
3705 });
3706 let response = client
3707 .request(proto::Pull {
3708 project_id: project_id.0,
3709 repository_id: id.to_proto(),
3710 askpass_id,
3711 branch_name: branch.to_string(),
3712 remote_name: remote.to_string(),
3713 })
3714 .await
3715 .context("sending pull request")?;
3716
3717 Ok(RemoteCommandOutput {
3718 stdout: response.stdout,
3719 stderr: response.stderr,
3720 })
3721 }
3722 }
3723 },
3724 )
3725 }
3726
3727 fn spawn_set_index_text_job(
3728 &mut self,
3729 path: RepoPath,
3730 content: Option<String>,
3731 hunk_staging_operation_count: Option<usize>,
3732 cx: &mut Context<Self>,
3733 ) -> oneshot::Receiver<anyhow::Result<()>> {
3734 let id = self.id;
3735 let this = cx.weak_entity();
3736 let git_store = self.git_store.clone();
3737 self.send_keyed_job(
3738 Some(GitJobKey::WriteIndex(path.clone())),
3739 None,
3740 move |git_repo, mut cx| async move {
3741 log::debug!("start updating index text for buffer {}", path.display());
3742 match git_repo {
3743 RepositoryState::Local {
3744 backend,
3745 environment,
3746 ..
3747 } => {
3748 backend
3749 .set_index_text(path.clone(), content, environment.clone())
3750 .await?;
3751 }
3752 RepositoryState::Remote { project_id, client } => {
3753 client
3754 .request(proto::SetIndexText {
3755 project_id: project_id.0,
3756 repository_id: id.to_proto(),
3757 path: path.as_ref().to_proto(),
3758 text: content,
3759 })
3760 .await?;
3761 }
3762 }
3763 log::debug!("finish updating index text for buffer {}", path.display());
3764
3765 if let Some(hunk_staging_operation_count) = hunk_staging_operation_count {
3766 let project_path = this
3767 .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
3768 .ok()
3769 .flatten();
3770 git_store.update(&mut cx, |git_store, cx| {
3771 let buffer_id = git_store
3772 .buffer_store
3773 .read(cx)
3774 .get_by_path(&project_path?)?
3775 .read(cx)
3776 .remote_id();
3777 let diff_state = git_store.diffs.get(&buffer_id)?;
3778 diff_state.update(cx, |diff_state, _| {
3779 diff_state.hunk_staging_operation_count_as_of_write =
3780 hunk_staging_operation_count;
3781 });
3782 Some(())
3783 })?;
3784 }
3785 Ok(())
3786 },
3787 )
3788 }
3789
3790 pub fn get_remotes(
3791 &mut self,
3792 branch_name: Option<String>,
3793 ) -> oneshot::Receiver<Result<Vec<Remote>>> {
3794 let id = self.id;
3795 self.send_job(None, move |repo, _cx| async move {
3796 match repo {
3797 RepositoryState::Local { backend, .. } => backend.get_remotes(branch_name).await,
3798 RepositoryState::Remote { project_id, client } => {
3799 let response = client
3800 .request(proto::GetRemotes {
3801 project_id: project_id.0,
3802 repository_id: id.to_proto(),
3803 branch_name,
3804 })
3805 .await?;
3806
3807 let remotes = response
3808 .remotes
3809 .into_iter()
3810 .map(|remotes| git::repository::Remote {
3811 name: remotes.name.into(),
3812 })
3813 .collect();
3814
3815 Ok(remotes)
3816 }
3817 }
3818 })
3819 }
3820
3821 pub fn branches(&mut self) -> oneshot::Receiver<Result<Vec<Branch>>> {
3822 let id = self.id;
3823 self.send_job(None, move |repo, _| async move {
3824 match repo {
3825 RepositoryState::Local { backend, .. } => backend.branches().await,
3826 RepositoryState::Remote { project_id, client } => {
3827 let response = client
3828 .request(proto::GitGetBranches {
3829 project_id: project_id.0,
3830 repository_id: id.to_proto(),
3831 })
3832 .await?;
3833
3834 let branches = response
3835 .branches
3836 .into_iter()
3837 .map(|branch| proto_to_branch(&branch))
3838 .collect();
3839
3840 Ok(branches)
3841 }
3842 }
3843 })
3844 }
3845
3846 pub fn diff(&mut self, diff_type: DiffType, _cx: &App) -> oneshot::Receiver<Result<String>> {
3847 let id = self.id;
3848 self.send_job(None, move |repo, _cx| async move {
3849 match repo {
3850 RepositoryState::Local { backend, .. } => backend.diff(diff_type).await,
3851 RepositoryState::Remote { project_id, client } => {
3852 let response = client
3853 .request(proto::GitDiff {
3854 project_id: project_id.0,
3855 repository_id: id.to_proto(),
3856 diff_type: match diff_type {
3857 DiffType::HeadToIndex => {
3858 proto::git_diff::DiffType::HeadToIndex.into()
3859 }
3860 DiffType::HeadToWorktree => {
3861 proto::git_diff::DiffType::HeadToWorktree.into()
3862 }
3863 },
3864 })
3865 .await?;
3866
3867 Ok(response.diff)
3868 }
3869 }
3870 })
3871 }
3872
3873 pub fn create_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3874 let id = self.id;
3875 self.send_job(
3876 Some(format!("git switch -c {branch_name}").into()),
3877 move |repo, _cx| async move {
3878 match repo {
3879 RepositoryState::Local { backend, .. } => {
3880 backend.create_branch(branch_name).await
3881 }
3882 RepositoryState::Remote { project_id, client } => {
3883 client
3884 .request(proto::GitCreateBranch {
3885 project_id: project_id.0,
3886 repository_id: id.to_proto(),
3887 branch_name,
3888 })
3889 .await?;
3890
3891 Ok(())
3892 }
3893 }
3894 },
3895 )
3896 }
3897
3898 pub fn change_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3899 let id = self.id;
3900 self.send_job(
3901 Some(format!("git switch {branch_name}").into()),
3902 move |repo, _cx| async move {
3903 match repo {
3904 RepositoryState::Local { backend, .. } => {
3905 backend.change_branch(branch_name).await
3906 }
3907 RepositoryState::Remote { project_id, client } => {
3908 client
3909 .request(proto::GitChangeBranch {
3910 project_id: project_id.0,
3911 repository_id: id.to_proto(),
3912 branch_name,
3913 })
3914 .await?;
3915
3916 Ok(())
3917 }
3918 }
3919 },
3920 )
3921 }
3922
3923 pub fn check_for_pushed_commits(&mut self) -> oneshot::Receiver<Result<Vec<SharedString>>> {
3924 let id = self.id;
3925 self.send_job(None, move |repo, _cx| async move {
3926 match repo {
3927 RepositoryState::Local { backend, .. } => backend.check_for_pushed_commit().await,
3928 RepositoryState::Remote { project_id, client } => {
3929 let response = client
3930 .request(proto::CheckForPushedCommits {
3931 project_id: project_id.0,
3932 repository_id: id.to_proto(),
3933 })
3934 .await?;
3935
3936 let branches = response.pushed_to.into_iter().map(Into::into).collect();
3937
3938 Ok(branches)
3939 }
3940 }
3941 })
3942 }
3943
3944 pub fn checkpoint(&mut self) -> oneshot::Receiver<Result<GitRepositoryCheckpoint>> {
3945 self.send_job(None, |repo, _cx| async move {
3946 match repo {
3947 RepositoryState::Local { backend, .. } => backend.checkpoint().await,
3948 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
3949 }
3950 })
3951 }
3952
3953 pub fn restore_checkpoint(
3954 &mut self,
3955 checkpoint: GitRepositoryCheckpoint,
3956 ) -> oneshot::Receiver<Result<()>> {
3957 self.send_job(None, move |repo, _cx| async move {
3958 match repo {
3959 RepositoryState::Local { backend, .. } => {
3960 backend.restore_checkpoint(checkpoint).await
3961 }
3962 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
3963 }
3964 })
3965 }
3966
3967 pub(crate) fn apply_remote_update(
3968 &mut self,
3969 update: proto::UpdateRepository,
3970 is_new: bool,
3971 cx: &mut Context<Self>,
3972 ) -> Result<()> {
3973 let conflicted_paths = TreeSet::from_ordered_entries(
3974 update
3975 .current_merge_conflicts
3976 .into_iter()
3977 .map(|path| RepoPath(Path::new(&path).into())),
3978 );
3979 self.snapshot.branch = update.branch_summary.as_ref().map(proto_to_branch);
3980 self.snapshot.head_commit = update
3981 .head_commit_details
3982 .as_ref()
3983 .map(proto_to_commit_details);
3984
3985 self.snapshot.merge.conflicted_paths = conflicted_paths;
3986
3987 let edits = update
3988 .removed_statuses
3989 .into_iter()
3990 .map(|path| sum_tree::Edit::Remove(PathKey(FromProto::from_proto(path))))
3991 .chain(
3992 update
3993 .updated_statuses
3994 .into_iter()
3995 .filter_map(|updated_status| {
3996 Some(sum_tree::Edit::Insert(updated_status.try_into().log_err()?))
3997 }),
3998 )
3999 .collect::<Vec<_>>();
4000 self.snapshot.statuses_by_path.edit(edits, &());
4001 if update.is_last_update {
4002 self.snapshot.scan_id = update.scan_id;
4003 }
4004 cx.emit(RepositoryEvent::Updated {
4005 full_scan: true,
4006 new_instance: is_new,
4007 });
4008 Ok(())
4009 }
4010
4011 pub fn compare_checkpoints(
4012 &mut self,
4013 left: GitRepositoryCheckpoint,
4014 right: GitRepositoryCheckpoint,
4015 ) -> oneshot::Receiver<Result<bool>> {
4016 self.send_job(None, move |repo, _cx| async move {
4017 match repo {
4018 RepositoryState::Local { backend, .. } => {
4019 backend.compare_checkpoints(left, right).await
4020 }
4021 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
4022 }
4023 })
4024 }
4025
4026 pub fn diff_checkpoints(
4027 &mut self,
4028 base_checkpoint: GitRepositoryCheckpoint,
4029 target_checkpoint: GitRepositoryCheckpoint,
4030 ) -> oneshot::Receiver<Result<String>> {
4031 self.send_job(None, move |repo, _cx| async move {
4032 match repo {
4033 RepositoryState::Local { backend, .. } => {
4034 backend
4035 .diff_checkpoints(base_checkpoint, target_checkpoint)
4036 .await
4037 }
4038 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
4039 }
4040 })
4041 }
4042
4043 fn schedule_scan(
4044 &mut self,
4045 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4046 cx: &mut Context<Self>,
4047 ) {
4048 let this = cx.weak_entity();
4049 let _ = self.send_keyed_job(
4050 Some(GitJobKey::ReloadGitState),
4051 None,
4052 |state, mut cx| async move {
4053 log::debug!("run scheduled git status scan");
4054
4055 let Some(this) = this.upgrade() else {
4056 return Ok(());
4057 };
4058 let RepositoryState::Local { backend, .. } = state else {
4059 bail!("not a local repository")
4060 };
4061 let (snapshot, events) = this
4062 .read_with(&mut cx, |this, _| {
4063 compute_snapshot(
4064 this.id,
4065 this.work_directory_abs_path.clone(),
4066 this.snapshot.clone(),
4067 backend.clone(),
4068 )
4069 })?
4070 .await?;
4071 this.update(&mut cx, |this, cx| {
4072 this.snapshot = snapshot.clone();
4073 for event in events {
4074 cx.emit(event);
4075 }
4076 })?;
4077 if let Some(updates_tx) = updates_tx {
4078 updates_tx
4079 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
4080 .ok();
4081 }
4082 Ok(())
4083 },
4084 );
4085 }
4086
4087 fn spawn_local_git_worker(
4088 work_directory_abs_path: Arc<Path>,
4089 dot_git_abs_path: Arc<Path>,
4090 _repository_dir_abs_path: Arc<Path>,
4091 _common_dir_abs_path: Arc<Path>,
4092 project_environment: WeakEntity<ProjectEnvironment>,
4093 fs: Arc<dyn Fs>,
4094 cx: &mut Context<Self>,
4095 ) -> mpsc::UnboundedSender<GitJob> {
4096 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4097
4098 cx.spawn(async move |_, cx| {
4099 let environment = project_environment
4100 .upgrade()
4101 .context("missing project environment")?
4102 .update(cx, |project_environment, cx| {
4103 project_environment.get_directory_environment(work_directory_abs_path.clone(), cx)
4104 })?
4105 .await
4106 .unwrap_or_else(|| {
4107 log::error!("failed to get working directory environment for repository {work_directory_abs_path:?}");
4108 HashMap::default()
4109 });
4110 let backend = cx
4111 .background_spawn(async move {
4112 fs.open_repo(&dot_git_abs_path)
4113 .with_context(|| format!("opening repository at {dot_git_abs_path:?}"))
4114 })
4115 .await?;
4116
4117 if let Some(git_hosting_provider_registry) =
4118 cx.update(|cx| GitHostingProviderRegistry::try_global(cx))?
4119 {
4120 git_hosting_providers::register_additional_providers(
4121 git_hosting_provider_registry,
4122 backend.clone(),
4123 );
4124 }
4125
4126 let state = RepositoryState::Local {
4127 backend,
4128 environment: Arc::new(environment),
4129 };
4130 let mut jobs = VecDeque::new();
4131 loop {
4132 while let Ok(Some(next_job)) = job_rx.try_next() {
4133 jobs.push_back(next_job);
4134 }
4135
4136 if let Some(job) = jobs.pop_front() {
4137 if let Some(current_key) = &job.key {
4138 if jobs
4139 .iter()
4140 .any(|other_job| other_job.key.as_ref() == Some(current_key))
4141 {
4142 continue;
4143 }
4144 }
4145 (job.job)(state.clone(), cx).await;
4146 } else if let Some(job) = job_rx.next().await {
4147 jobs.push_back(job);
4148 } else {
4149 break;
4150 }
4151 }
4152 anyhow::Ok(())
4153 })
4154 .detach_and_log_err(cx);
4155
4156 job_tx
4157 }
4158
4159 fn spawn_remote_git_worker(
4160 project_id: ProjectId,
4161 client: AnyProtoClient,
4162 cx: &mut Context<Self>,
4163 ) -> mpsc::UnboundedSender<GitJob> {
4164 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4165
4166 cx.spawn(async move |_, cx| {
4167 let state = RepositoryState::Remote { project_id, client };
4168 let mut jobs = VecDeque::new();
4169 loop {
4170 while let Ok(Some(next_job)) = job_rx.try_next() {
4171 jobs.push_back(next_job);
4172 }
4173
4174 if let Some(job) = jobs.pop_front() {
4175 if let Some(current_key) = &job.key {
4176 if jobs
4177 .iter()
4178 .any(|other_job| other_job.key.as_ref() == Some(current_key))
4179 {
4180 continue;
4181 }
4182 }
4183 (job.job)(state.clone(), cx).await;
4184 } else if let Some(job) = job_rx.next().await {
4185 jobs.push_back(job);
4186 } else {
4187 break;
4188 }
4189 }
4190 anyhow::Ok(())
4191 })
4192 .detach_and_log_err(cx);
4193
4194 job_tx
4195 }
4196
4197 fn load_staged_text(
4198 &mut self,
4199 buffer_id: BufferId,
4200 repo_path: RepoPath,
4201 cx: &App,
4202 ) -> Task<Result<Option<String>>> {
4203 let rx = self.send_job(None, move |state, _| async move {
4204 match state {
4205 RepositoryState::Local { backend, .. } => {
4206 anyhow::Ok(backend.load_index_text(repo_path).await)
4207 }
4208 RepositoryState::Remote { project_id, client } => {
4209 let response = client
4210 .request(proto::OpenUnstagedDiff {
4211 project_id: project_id.to_proto(),
4212 buffer_id: buffer_id.to_proto(),
4213 })
4214 .await?;
4215 Ok(response.staged_text)
4216 }
4217 }
4218 });
4219 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4220 }
4221
4222 fn load_committed_text(
4223 &mut self,
4224 buffer_id: BufferId,
4225 repo_path: RepoPath,
4226 cx: &App,
4227 ) -> Task<Result<DiffBasesChange>> {
4228 let rx = self.send_job(None, move |state, _| async move {
4229 match state {
4230 RepositoryState::Local { backend, .. } => {
4231 let committed_text = backend.load_committed_text(repo_path.clone()).await;
4232 let staged_text = backend.load_index_text(repo_path).await;
4233 let diff_bases_change = if committed_text == staged_text {
4234 DiffBasesChange::SetBoth(committed_text)
4235 } else {
4236 DiffBasesChange::SetEach {
4237 index: staged_text,
4238 head: committed_text,
4239 }
4240 };
4241 anyhow::Ok(diff_bases_change)
4242 }
4243 RepositoryState::Remote { project_id, client } => {
4244 use proto::open_uncommitted_diff_response::Mode;
4245
4246 let response = client
4247 .request(proto::OpenUncommittedDiff {
4248 project_id: project_id.to_proto(),
4249 buffer_id: buffer_id.to_proto(),
4250 })
4251 .await?;
4252 let mode = Mode::from_i32(response.mode).context("Invalid mode")?;
4253 let bases = match mode {
4254 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
4255 Mode::IndexAndHead => DiffBasesChange::SetEach {
4256 head: response.committed_text,
4257 index: response.staged_text,
4258 },
4259 };
4260 Ok(bases)
4261 }
4262 }
4263 });
4264
4265 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4266 }
4267
4268 fn paths_changed(
4269 &mut self,
4270 paths: Vec<RepoPath>,
4271 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4272 cx: &mut Context<Self>,
4273 ) {
4274 self.paths_needing_status_update.extend(paths);
4275
4276 let this = cx.weak_entity();
4277 let _ = self.send_keyed_job(
4278 Some(GitJobKey::RefreshStatuses),
4279 None,
4280 |state, mut cx| async move {
4281 let (prev_snapshot, mut changed_paths) = this.update(&mut cx, |this, _| {
4282 (
4283 this.snapshot.clone(),
4284 mem::take(&mut this.paths_needing_status_update),
4285 )
4286 })?;
4287 let RepositoryState::Local { backend, .. } = state else {
4288 bail!("not a local repository")
4289 };
4290
4291 let paths = changed_paths.iter().cloned().collect::<Vec<_>>();
4292 let statuses = backend.status(&paths).await?;
4293
4294 let changed_path_statuses = cx
4295 .background_spawn(async move {
4296 let mut changed_path_statuses = Vec::new();
4297 let prev_statuses = prev_snapshot.statuses_by_path.clone();
4298 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4299
4300 for (repo_path, status) in &*statuses.entries {
4301 changed_paths.remove(repo_path);
4302 if cursor.seek_forward(&PathTarget::Path(repo_path), Bias::Left, &()) {
4303 if cursor.item().is_some_and(|entry| entry.status == *status) {
4304 continue;
4305 }
4306 }
4307
4308 changed_path_statuses.push(Edit::Insert(StatusEntry {
4309 repo_path: repo_path.clone(),
4310 status: *status,
4311 }));
4312 }
4313 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4314 for path in changed_paths.into_iter() {
4315 if cursor.seek_forward(&PathTarget::Path(&path), Bias::Left, &()) {
4316 changed_path_statuses.push(Edit::Remove(PathKey(path.0)));
4317 }
4318 }
4319 changed_path_statuses
4320 })
4321 .await;
4322
4323 this.update(&mut cx, |this, cx| {
4324 if !changed_path_statuses.is_empty() {
4325 this.snapshot
4326 .statuses_by_path
4327 .edit(changed_path_statuses, &());
4328 this.snapshot.scan_id += 1;
4329 if let Some(updates_tx) = updates_tx {
4330 updates_tx
4331 .unbounded_send(DownstreamUpdate::UpdateRepository(
4332 this.snapshot.clone(),
4333 ))
4334 .ok();
4335 }
4336 }
4337 cx.emit(RepositoryEvent::Updated {
4338 full_scan: false,
4339 new_instance: false,
4340 });
4341 })
4342 },
4343 );
4344 }
4345
4346 /// currently running git command and when it started
4347 pub fn current_job(&self) -> Option<JobInfo> {
4348 self.active_jobs.values().next().cloned()
4349 }
4350
4351 pub fn barrier(&mut self) -> oneshot::Receiver<()> {
4352 self.send_job(None, |_, _| async {})
4353 }
4354}
4355
4356fn get_permalink_in_rust_registry_src(
4357 provider_registry: Arc<GitHostingProviderRegistry>,
4358 path: PathBuf,
4359 selection: Range<u32>,
4360) -> Result<url::Url> {
4361 #[derive(Deserialize)]
4362 struct CargoVcsGit {
4363 sha1: String,
4364 }
4365
4366 #[derive(Deserialize)]
4367 struct CargoVcsInfo {
4368 git: CargoVcsGit,
4369 path_in_vcs: String,
4370 }
4371
4372 #[derive(Deserialize)]
4373 struct CargoPackage {
4374 repository: String,
4375 }
4376
4377 #[derive(Deserialize)]
4378 struct CargoToml {
4379 package: CargoPackage,
4380 }
4381
4382 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
4383 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
4384 Some((dir, json))
4385 }) else {
4386 bail!("No .cargo_vcs_info.json found in parent directories")
4387 };
4388 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
4389 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
4390 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
4391 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
4392 .context("parsing package.repository field of manifest")?;
4393 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
4394 let permalink = provider.build_permalink(
4395 remote,
4396 BuildPermalinkParams {
4397 sha: &cargo_vcs_info.git.sha1,
4398 path: &path.to_string_lossy(),
4399 selection: Some(selection),
4400 },
4401 );
4402 Ok(permalink)
4403}
4404
4405fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
4406 let Some(blame) = blame else {
4407 return proto::BlameBufferResponse {
4408 blame_response: None,
4409 };
4410 };
4411
4412 let entries = blame
4413 .entries
4414 .into_iter()
4415 .map(|entry| proto::BlameEntry {
4416 sha: entry.sha.as_bytes().into(),
4417 start_line: entry.range.start,
4418 end_line: entry.range.end,
4419 original_line_number: entry.original_line_number,
4420 author: entry.author.clone(),
4421 author_mail: entry.author_mail.clone(),
4422 author_time: entry.author_time,
4423 author_tz: entry.author_tz.clone(),
4424 committer: entry.committer_name.clone(),
4425 committer_mail: entry.committer_email.clone(),
4426 committer_time: entry.committer_time,
4427 committer_tz: entry.committer_tz.clone(),
4428 summary: entry.summary.clone(),
4429 previous: entry.previous.clone(),
4430 filename: entry.filename.clone(),
4431 })
4432 .collect::<Vec<_>>();
4433
4434 let messages = blame
4435 .messages
4436 .into_iter()
4437 .map(|(oid, message)| proto::CommitMessage {
4438 oid: oid.as_bytes().into(),
4439 message,
4440 })
4441 .collect::<Vec<_>>();
4442
4443 proto::BlameBufferResponse {
4444 blame_response: Some(proto::blame_buffer_response::BlameResponse {
4445 entries,
4446 messages,
4447 remote_url: blame.remote_url,
4448 }),
4449 }
4450}
4451
4452fn deserialize_blame_buffer_response(
4453 response: proto::BlameBufferResponse,
4454) -> Option<git::blame::Blame> {
4455 let response = response.blame_response?;
4456 let entries = response
4457 .entries
4458 .into_iter()
4459 .filter_map(|entry| {
4460 Some(git::blame::BlameEntry {
4461 sha: git::Oid::from_bytes(&entry.sha).ok()?,
4462 range: entry.start_line..entry.end_line,
4463 original_line_number: entry.original_line_number,
4464 committer_name: entry.committer,
4465 committer_time: entry.committer_time,
4466 committer_tz: entry.committer_tz,
4467 committer_email: entry.committer_mail,
4468 author: entry.author,
4469 author_mail: entry.author_mail,
4470 author_time: entry.author_time,
4471 author_tz: entry.author_tz,
4472 summary: entry.summary,
4473 previous: entry.previous,
4474 filename: entry.filename,
4475 })
4476 })
4477 .collect::<Vec<_>>();
4478
4479 let messages = response
4480 .messages
4481 .into_iter()
4482 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
4483 .collect::<HashMap<_, _>>();
4484
4485 Some(Blame {
4486 entries,
4487 messages,
4488 remote_url: response.remote_url,
4489 })
4490}
4491
4492fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
4493 proto::Branch {
4494 is_head: branch.is_head,
4495 ref_name: branch.ref_name.to_string(),
4496 unix_timestamp: branch
4497 .most_recent_commit
4498 .as_ref()
4499 .map(|commit| commit.commit_timestamp as u64),
4500 upstream: branch.upstream.as_ref().map(|upstream| proto::GitUpstream {
4501 ref_name: upstream.ref_name.to_string(),
4502 tracking: upstream
4503 .tracking
4504 .status()
4505 .map(|upstream| proto::UpstreamTracking {
4506 ahead: upstream.ahead as u64,
4507 behind: upstream.behind as u64,
4508 }),
4509 }),
4510 most_recent_commit: branch
4511 .most_recent_commit
4512 .as_ref()
4513 .map(|commit| proto::CommitSummary {
4514 sha: commit.sha.to_string(),
4515 subject: commit.subject.to_string(),
4516 commit_timestamp: commit.commit_timestamp,
4517 }),
4518 }
4519}
4520
4521fn proto_to_branch(proto: &proto::Branch) -> git::repository::Branch {
4522 git::repository::Branch {
4523 is_head: proto.is_head,
4524 ref_name: proto.ref_name.clone().into(),
4525 upstream: proto
4526 .upstream
4527 .as_ref()
4528 .map(|upstream| git::repository::Upstream {
4529 ref_name: upstream.ref_name.to_string().into(),
4530 tracking: upstream
4531 .tracking
4532 .as_ref()
4533 .map(|tracking| {
4534 git::repository::UpstreamTracking::Tracked(UpstreamTrackingStatus {
4535 ahead: tracking.ahead as u32,
4536 behind: tracking.behind as u32,
4537 })
4538 })
4539 .unwrap_or(git::repository::UpstreamTracking::Gone),
4540 }),
4541 most_recent_commit: proto.most_recent_commit.as_ref().map(|commit| {
4542 git::repository::CommitSummary {
4543 sha: commit.sha.to_string().into(),
4544 subject: commit.subject.to_string().into(),
4545 commit_timestamp: commit.commit_timestamp,
4546 has_parent: true,
4547 }
4548 }),
4549 }
4550}
4551
4552fn commit_details_to_proto(commit: &CommitDetails) -> proto::GitCommitDetails {
4553 proto::GitCommitDetails {
4554 sha: commit.sha.to_string(),
4555 message: commit.message.to_string(),
4556 commit_timestamp: commit.commit_timestamp,
4557 author_email: commit.author_email.to_string(),
4558 author_name: commit.author_name.to_string(),
4559 }
4560}
4561
4562fn proto_to_commit_details(proto: &proto::GitCommitDetails) -> CommitDetails {
4563 CommitDetails {
4564 sha: proto.sha.clone().into(),
4565 message: proto.message.clone().into(),
4566 commit_timestamp: proto.commit_timestamp,
4567 author_email: proto.author_email.clone().into(),
4568 author_name: proto.author_name.clone().into(),
4569 }
4570}
4571
4572async fn compute_snapshot(
4573 id: RepositoryId,
4574 work_directory_abs_path: Arc<Path>,
4575 prev_snapshot: RepositorySnapshot,
4576 backend: Arc<dyn GitRepository>,
4577) -> Result<(RepositorySnapshot, Vec<RepositoryEvent>)> {
4578 let mut events = Vec::new();
4579 let branches = backend.branches().await?;
4580 let branch = branches.into_iter().find(|branch| branch.is_head);
4581 let statuses = backend
4582 .status(std::slice::from_ref(&WORK_DIRECTORY_REPO_PATH))
4583 .await?;
4584 let statuses_by_path = SumTree::from_iter(
4585 statuses
4586 .entries
4587 .iter()
4588 .map(|(repo_path, status)| StatusEntry {
4589 repo_path: repo_path.clone(),
4590 status: *status,
4591 }),
4592 &(),
4593 );
4594 let (merge_details, merge_heads_changed) =
4595 MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?;
4596 log::debug!("new merge details (changed={merge_heads_changed:?}): {merge_details:?}");
4597
4598 if merge_heads_changed
4599 || branch != prev_snapshot.branch
4600 || statuses_by_path != prev_snapshot.statuses_by_path
4601 {
4602 events.push(RepositoryEvent::Updated {
4603 full_scan: true,
4604 new_instance: false,
4605 });
4606 }
4607
4608 // Cache merge conflict paths so they don't change from staging/unstaging,
4609 // until the merge heads change (at commit time, etc.).
4610 if merge_heads_changed {
4611 events.push(RepositoryEvent::MergeHeadsChanged);
4612 }
4613
4614 // Useful when branch is None in detached head state
4615 let head_commit = match backend.head_sha().await {
4616 Some(head_sha) => backend.show(head_sha).await.log_err(),
4617 None => None,
4618 };
4619
4620 let snapshot = RepositorySnapshot {
4621 id,
4622 statuses_by_path,
4623 work_directory_abs_path,
4624 scan_id: prev_snapshot.scan_id + 1,
4625 branch,
4626 head_commit,
4627 merge: merge_details,
4628 };
4629
4630 Ok((snapshot, events))
4631}
4632
4633fn status_from_proto(
4634 simple_status: i32,
4635 status: Option<proto::GitFileStatus>,
4636) -> anyhow::Result<FileStatus> {
4637 use proto::git_file_status::Variant;
4638
4639 let Some(variant) = status.and_then(|status| status.variant) else {
4640 let code = proto::GitStatus::from_i32(simple_status)
4641 .with_context(|| format!("Invalid git status code: {simple_status}"))?;
4642 let result = match code {
4643 proto::GitStatus::Added => TrackedStatus {
4644 worktree_status: StatusCode::Added,
4645 index_status: StatusCode::Unmodified,
4646 }
4647 .into(),
4648 proto::GitStatus::Modified => TrackedStatus {
4649 worktree_status: StatusCode::Modified,
4650 index_status: StatusCode::Unmodified,
4651 }
4652 .into(),
4653 proto::GitStatus::Conflict => UnmergedStatus {
4654 first_head: UnmergedStatusCode::Updated,
4655 second_head: UnmergedStatusCode::Updated,
4656 }
4657 .into(),
4658 proto::GitStatus::Deleted => TrackedStatus {
4659 worktree_status: StatusCode::Deleted,
4660 index_status: StatusCode::Unmodified,
4661 }
4662 .into(),
4663 _ => anyhow::bail!("Invalid code for simple status: {simple_status}"),
4664 };
4665 return Ok(result);
4666 };
4667
4668 let result = match variant {
4669 Variant::Untracked(_) => FileStatus::Untracked,
4670 Variant::Ignored(_) => FileStatus::Ignored,
4671 Variant::Unmerged(unmerged) => {
4672 let [first_head, second_head] =
4673 [unmerged.first_head, unmerged.second_head].map(|head| {
4674 let code = proto::GitStatus::from_i32(head)
4675 .with_context(|| format!("Invalid git status code: {head}"))?;
4676 let result = match code {
4677 proto::GitStatus::Added => UnmergedStatusCode::Added,
4678 proto::GitStatus::Updated => UnmergedStatusCode::Updated,
4679 proto::GitStatus::Deleted => UnmergedStatusCode::Deleted,
4680 _ => anyhow::bail!("Invalid code for unmerged status: {code:?}"),
4681 };
4682 Ok(result)
4683 });
4684 let [first_head, second_head] = [first_head?, second_head?];
4685 UnmergedStatus {
4686 first_head,
4687 second_head,
4688 }
4689 .into()
4690 }
4691 Variant::Tracked(tracked) => {
4692 let [index_status, worktree_status] = [tracked.index_status, tracked.worktree_status]
4693 .map(|status| {
4694 let code = proto::GitStatus::from_i32(status)
4695 .with_context(|| format!("Invalid git status code: {status}"))?;
4696 let result = match code {
4697 proto::GitStatus::Modified => StatusCode::Modified,
4698 proto::GitStatus::TypeChanged => StatusCode::TypeChanged,
4699 proto::GitStatus::Added => StatusCode::Added,
4700 proto::GitStatus::Deleted => StatusCode::Deleted,
4701 proto::GitStatus::Renamed => StatusCode::Renamed,
4702 proto::GitStatus::Copied => StatusCode::Copied,
4703 proto::GitStatus::Unmodified => StatusCode::Unmodified,
4704 _ => anyhow::bail!("Invalid code for tracked status: {code:?}"),
4705 };
4706 Ok(result)
4707 });
4708 let [index_status, worktree_status] = [index_status?, worktree_status?];
4709 TrackedStatus {
4710 index_status,
4711 worktree_status,
4712 }
4713 .into()
4714 }
4715 };
4716 Ok(result)
4717}
4718
4719fn status_to_proto(status: FileStatus) -> proto::GitFileStatus {
4720 use proto::git_file_status::{Tracked, Unmerged, Variant};
4721
4722 let variant = match status {
4723 FileStatus::Untracked => Variant::Untracked(Default::default()),
4724 FileStatus::Ignored => Variant::Ignored(Default::default()),
4725 FileStatus::Unmerged(UnmergedStatus {
4726 first_head,
4727 second_head,
4728 }) => Variant::Unmerged(Unmerged {
4729 first_head: unmerged_status_to_proto(first_head),
4730 second_head: unmerged_status_to_proto(second_head),
4731 }),
4732 FileStatus::Tracked(TrackedStatus {
4733 index_status,
4734 worktree_status,
4735 }) => Variant::Tracked(Tracked {
4736 index_status: tracked_status_to_proto(index_status),
4737 worktree_status: tracked_status_to_proto(worktree_status),
4738 }),
4739 };
4740 proto::GitFileStatus {
4741 variant: Some(variant),
4742 }
4743}
4744
4745fn unmerged_status_to_proto(code: UnmergedStatusCode) -> i32 {
4746 match code {
4747 UnmergedStatusCode::Added => proto::GitStatus::Added as _,
4748 UnmergedStatusCode::Deleted => proto::GitStatus::Deleted as _,
4749 UnmergedStatusCode::Updated => proto::GitStatus::Updated as _,
4750 }
4751}
4752
4753fn tracked_status_to_proto(code: StatusCode) -> i32 {
4754 match code {
4755 StatusCode::Added => proto::GitStatus::Added as _,
4756 StatusCode::Deleted => proto::GitStatus::Deleted as _,
4757 StatusCode::Modified => proto::GitStatus::Modified as _,
4758 StatusCode::Renamed => proto::GitStatus::Renamed as _,
4759 StatusCode::TypeChanged => proto::GitStatus::TypeChanged as _,
4760 StatusCode::Copied => proto::GitStatus::Copied as _,
4761 StatusCode::Unmodified => proto::GitStatus::Unmodified as _,
4762 }
4763}