1mod conflict_set;
2pub mod git_traversal;
3
4use crate::{
5 ProjectEnvironment, ProjectItem, ProjectPath,
6 buffer_store::{BufferStore, BufferStoreEvent},
7 worktree_store::{WorktreeStore, WorktreeStoreEvent},
8};
9use anyhow::{Context as _, Result, anyhow, bail};
10use askpass::AskPassDelegate;
11use buffer_diff::{BufferDiff, BufferDiffEvent};
12use client::ProjectId;
13use collections::HashMap;
14pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, ConflictSetUpdate};
15use fs::Fs;
16use futures::{
17 FutureExt, StreamExt as _,
18 channel::{mpsc, oneshot},
19 future::{self, Shared},
20};
21use git::{
22 BuildPermalinkParams, GitHostingProviderRegistry, WORK_DIRECTORY_REPO_PATH,
23 blame::Blame,
24 parse_git_remote_url,
25 repository::{
26 Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, DiffType, FetchOptions,
27 GitRepository, GitRepositoryCheckpoint, PushOptions, Remote, RemoteCommandOutput, RepoPath,
28 ResetMode, UpstreamTrackingStatus,
29 },
30 status::{
31 FileStatus, GitSummary, StatusCode, TrackedStatus, UnmergedStatus, UnmergedStatusCode,
32 },
33};
34use gpui::{
35 App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
36 WeakEntity,
37};
38use language::{
39 Buffer, BufferEvent, Language, LanguageRegistry,
40 proto::{deserialize_version, serialize_version},
41};
42use parking_lot::Mutex;
43use postage::stream::Stream as _;
44use rpc::{
45 AnyProtoClient, TypedEnvelope,
46 proto::{self, FromProto, SSH_PROJECT_ID, ToProto, git_reset, split_repository_update},
47};
48use serde::Deserialize;
49use std::{
50 cmp::Ordering,
51 collections::{BTreeSet, VecDeque},
52 future::Future,
53 mem,
54 ops::Range,
55 path::{Path, PathBuf},
56 sync::{
57 Arc,
58 atomic::{self, AtomicU64},
59 },
60 time::Instant,
61};
62use sum_tree::{Edit, SumTree, TreeSet};
63use text::{Bias, BufferId};
64use util::{ResultExt, debug_panic, post_inc};
65use worktree::{
66 File, PathKey, PathProgress, PathSummary, PathTarget, UpdatedGitRepositoriesSet,
67 UpdatedGitRepository, Worktree,
68};
69
70pub struct GitStore {
71 state: GitStoreState,
72 buffer_store: Entity<BufferStore>,
73 worktree_store: Entity<WorktreeStore>,
74 repositories: HashMap<RepositoryId, Entity<Repository>>,
75 active_repo_id: Option<RepositoryId>,
76 #[allow(clippy::type_complexity)]
77 loading_diffs:
78 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
79 diffs: HashMap<BufferId, Entity<BufferGitState>>,
80 shared_diffs: HashMap<proto::PeerId, HashMap<BufferId, SharedDiffs>>,
81 _subscriptions: Vec<Subscription>,
82}
83
84#[derive(Default)]
85struct SharedDiffs {
86 unstaged: Option<Entity<BufferDiff>>,
87 uncommitted: Option<Entity<BufferDiff>>,
88}
89
90struct BufferGitState {
91 unstaged_diff: Option<WeakEntity<BufferDiff>>,
92 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
93 conflict_set: Option<WeakEntity<ConflictSet>>,
94 recalculate_diff_task: Option<Task<Result<()>>>,
95 reparse_conflict_markers_task: Option<Task<Result<()>>>,
96 language: Option<Arc<Language>>,
97 language_registry: Option<Arc<LanguageRegistry>>,
98 conflict_updated_futures: Vec<oneshot::Sender<()>>,
99 recalculating_tx: postage::watch::Sender<bool>,
100
101 /// These operation counts are used to ensure that head and index text
102 /// values read from the git repository are up-to-date with any hunk staging
103 /// operations that have been performed on the BufferDiff.
104 ///
105 /// The operation count is incremented immediately when the user initiates a
106 /// hunk stage/unstage operation. Then, upon finishing writing the new index
107 /// text do disk, the `operation count as of write` is updated to reflect
108 /// the operation count that prompted the write.
109 hunk_staging_operation_count: usize,
110 hunk_staging_operation_count_as_of_write: usize,
111
112 head_text: Option<Arc<String>>,
113 index_text: Option<Arc<String>>,
114 head_changed: bool,
115 index_changed: bool,
116 language_changed: bool,
117}
118
119#[derive(Clone, Debug)]
120enum DiffBasesChange {
121 SetIndex(Option<String>),
122 SetHead(Option<String>),
123 SetEach {
124 index: Option<String>,
125 head: Option<String>,
126 },
127 SetBoth(Option<String>),
128}
129
130#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
131enum DiffKind {
132 Unstaged,
133 Uncommitted,
134}
135
136enum GitStoreState {
137 Local {
138 next_repository_id: Arc<AtomicU64>,
139 downstream: Option<LocalDownstreamState>,
140 project_environment: Entity<ProjectEnvironment>,
141 fs: Arc<dyn Fs>,
142 },
143 Ssh {
144 upstream_client: AnyProtoClient,
145 upstream_project_id: ProjectId,
146 downstream: Option<(AnyProtoClient, ProjectId)>,
147 },
148 Remote {
149 upstream_client: AnyProtoClient,
150 upstream_project_id: ProjectId,
151 },
152}
153
154enum DownstreamUpdate {
155 UpdateRepository(RepositorySnapshot),
156 RemoveRepository(RepositoryId),
157}
158
159struct LocalDownstreamState {
160 client: AnyProtoClient,
161 project_id: ProjectId,
162 updates_tx: mpsc::UnboundedSender<DownstreamUpdate>,
163 _task: Task<Result<()>>,
164}
165
166#[derive(Clone, Debug)]
167pub struct GitStoreCheckpoint {
168 checkpoints_by_work_dir_abs_path: HashMap<Arc<Path>, GitRepositoryCheckpoint>,
169}
170
171#[derive(Clone, Debug, PartialEq, Eq)]
172pub struct StatusEntry {
173 pub repo_path: RepoPath,
174 pub status: FileStatus,
175}
176
177impl StatusEntry {
178 fn to_proto(&self) -> proto::StatusEntry {
179 let simple_status = match self.status {
180 FileStatus::Ignored | FileStatus::Untracked => proto::GitStatus::Added as i32,
181 FileStatus::Unmerged { .. } => proto::GitStatus::Conflict as i32,
182 FileStatus::Tracked(TrackedStatus {
183 index_status,
184 worktree_status,
185 }) => tracked_status_to_proto(if worktree_status != StatusCode::Unmodified {
186 worktree_status
187 } else {
188 index_status
189 }),
190 };
191
192 proto::StatusEntry {
193 repo_path: self.repo_path.as_ref().to_proto(),
194 simple_status,
195 status: Some(status_to_proto(self.status)),
196 }
197 }
198}
199
200impl TryFrom<proto::StatusEntry> for StatusEntry {
201 type Error = anyhow::Error;
202
203 fn try_from(value: proto::StatusEntry) -> Result<Self, Self::Error> {
204 let repo_path = RepoPath(Arc::<Path>::from_proto(value.repo_path));
205 let status = status_from_proto(value.simple_status, value.status)?;
206 Ok(Self { repo_path, status })
207 }
208}
209
210impl sum_tree::Item for StatusEntry {
211 type Summary = PathSummary<GitSummary>;
212
213 fn summary(&self, _: &<Self::Summary as sum_tree::Summary>::Context) -> Self::Summary {
214 PathSummary {
215 max_path: self.repo_path.0.clone(),
216 item_summary: self.status.summary(),
217 }
218 }
219}
220
221impl sum_tree::KeyedItem for StatusEntry {
222 type Key = PathKey;
223
224 fn key(&self) -> Self::Key {
225 PathKey(self.repo_path.0.clone())
226 }
227}
228
229#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
230pub struct RepositoryId(pub u64);
231
232#[derive(Clone, Debug, Default, PartialEq, Eq)]
233pub struct MergeDetails {
234 pub conflicted_paths: TreeSet<RepoPath>,
235 pub message: Option<SharedString>,
236 pub heads: Vec<Option<SharedString>>,
237}
238
239#[derive(Clone, Debug, PartialEq, Eq)]
240pub struct RepositorySnapshot {
241 pub id: RepositoryId,
242 pub statuses_by_path: SumTree<StatusEntry>,
243 pub work_directory_abs_path: Arc<Path>,
244 pub branch: Option<Branch>,
245 pub head_commit: Option<CommitDetails>,
246 pub scan_id: u64,
247 pub merge: MergeDetails,
248}
249
250type JobId = u64;
251
252#[derive(Clone, Debug, PartialEq, Eq)]
253pub struct JobInfo {
254 pub start: Instant,
255 pub message: SharedString,
256}
257
258pub struct Repository {
259 this: WeakEntity<Self>,
260 snapshot: RepositorySnapshot,
261 commit_message_buffer: Option<Entity<Buffer>>,
262 git_store: WeakEntity<GitStore>,
263 // For a local repository, holds paths that have had worktree events since the last status scan completed,
264 // and that should be examined during the next status scan.
265 paths_needing_status_update: BTreeSet<RepoPath>,
266 job_sender: mpsc::UnboundedSender<GitJob>,
267 active_jobs: HashMap<JobId, JobInfo>,
268 job_id: JobId,
269 askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
270 latest_askpass_id: u64,
271}
272
273impl std::ops::Deref for Repository {
274 type Target = RepositorySnapshot;
275
276 fn deref(&self) -> &Self::Target {
277 &self.snapshot
278 }
279}
280
281#[derive(Clone)]
282pub enum RepositoryState {
283 Local {
284 backend: Arc<dyn GitRepository>,
285 environment: Arc<HashMap<String, String>>,
286 },
287 Remote {
288 project_id: ProjectId,
289 client: AnyProtoClient,
290 },
291}
292
293#[derive(Clone, Debug)]
294pub enum RepositoryEvent {
295 Updated { full_scan: bool, new_instance: bool },
296 MergeHeadsChanged,
297}
298
299#[derive(Clone, Debug)]
300pub struct JobsUpdated;
301
302#[derive(Debug)]
303pub enum GitStoreEvent {
304 ActiveRepositoryChanged(Option<RepositoryId>),
305 RepositoryUpdated(RepositoryId, RepositoryEvent, bool),
306 RepositoryAdded(RepositoryId),
307 RepositoryRemoved(RepositoryId),
308 IndexWriteError(anyhow::Error),
309 JobsUpdated,
310 ConflictsUpdated,
311}
312
313impl EventEmitter<RepositoryEvent> for Repository {}
314impl EventEmitter<JobsUpdated> for Repository {}
315impl EventEmitter<GitStoreEvent> for GitStore {}
316
317pub struct GitJob {
318 job: Box<dyn FnOnce(RepositoryState, &mut AsyncApp) -> Task<()>>,
319 key: Option<GitJobKey>,
320}
321
322#[derive(PartialEq, Eq)]
323enum GitJobKey {
324 WriteIndex(RepoPath),
325 ReloadBufferDiffBases,
326 RefreshStatuses,
327 ReloadGitState,
328}
329
330impl GitStore {
331 pub fn local(
332 worktree_store: &Entity<WorktreeStore>,
333 buffer_store: Entity<BufferStore>,
334 environment: Entity<ProjectEnvironment>,
335 fs: Arc<dyn Fs>,
336 cx: &mut Context<Self>,
337 ) -> Self {
338 Self::new(
339 worktree_store.clone(),
340 buffer_store,
341 GitStoreState::Local {
342 next_repository_id: Arc::new(AtomicU64::new(1)),
343 downstream: None,
344 project_environment: environment,
345 fs,
346 },
347 cx,
348 )
349 }
350
351 pub fn remote(
352 worktree_store: &Entity<WorktreeStore>,
353 buffer_store: Entity<BufferStore>,
354 upstream_client: AnyProtoClient,
355 project_id: ProjectId,
356 cx: &mut Context<Self>,
357 ) -> Self {
358 Self::new(
359 worktree_store.clone(),
360 buffer_store,
361 GitStoreState::Remote {
362 upstream_client,
363 upstream_project_id: project_id,
364 },
365 cx,
366 )
367 }
368
369 pub fn ssh(
370 worktree_store: &Entity<WorktreeStore>,
371 buffer_store: Entity<BufferStore>,
372 upstream_client: AnyProtoClient,
373 cx: &mut Context<Self>,
374 ) -> Self {
375 Self::new(
376 worktree_store.clone(),
377 buffer_store,
378 GitStoreState::Ssh {
379 upstream_client,
380 upstream_project_id: ProjectId(SSH_PROJECT_ID),
381 downstream: None,
382 },
383 cx,
384 )
385 }
386
387 fn new(
388 worktree_store: Entity<WorktreeStore>,
389 buffer_store: Entity<BufferStore>,
390 state: GitStoreState,
391 cx: &mut Context<Self>,
392 ) -> Self {
393 let _subscriptions = vec![
394 cx.subscribe(&worktree_store, Self::on_worktree_store_event),
395 cx.subscribe(&buffer_store, Self::on_buffer_store_event),
396 ];
397
398 GitStore {
399 state,
400 buffer_store,
401 worktree_store,
402 repositories: HashMap::default(),
403 active_repo_id: None,
404 _subscriptions,
405 loading_diffs: HashMap::default(),
406 shared_diffs: HashMap::default(),
407 diffs: HashMap::default(),
408 }
409 }
410
411 pub fn init(client: &AnyProtoClient) {
412 client.add_entity_request_handler(Self::handle_get_remotes);
413 client.add_entity_request_handler(Self::handle_get_branches);
414 client.add_entity_request_handler(Self::handle_change_branch);
415 client.add_entity_request_handler(Self::handle_create_branch);
416 client.add_entity_request_handler(Self::handle_git_init);
417 client.add_entity_request_handler(Self::handle_push);
418 client.add_entity_request_handler(Self::handle_pull);
419 client.add_entity_request_handler(Self::handle_fetch);
420 client.add_entity_request_handler(Self::handle_stage);
421 client.add_entity_request_handler(Self::handle_unstage);
422 client.add_entity_request_handler(Self::handle_commit);
423 client.add_entity_request_handler(Self::handle_reset);
424 client.add_entity_request_handler(Self::handle_show);
425 client.add_entity_request_handler(Self::handle_load_commit_diff);
426 client.add_entity_request_handler(Self::handle_checkout_files);
427 client.add_entity_request_handler(Self::handle_open_commit_message_buffer);
428 client.add_entity_request_handler(Self::handle_set_index_text);
429 client.add_entity_request_handler(Self::handle_askpass);
430 client.add_entity_request_handler(Self::handle_check_for_pushed_commits);
431 client.add_entity_request_handler(Self::handle_git_diff);
432 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
433 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
434 client.add_entity_message_handler(Self::handle_update_diff_bases);
435 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
436 client.add_entity_request_handler(Self::handle_blame_buffer);
437 client.add_entity_message_handler(Self::handle_update_repository);
438 client.add_entity_message_handler(Self::handle_remove_repository);
439 }
440
441 pub fn is_local(&self) -> bool {
442 matches!(self.state, GitStoreState::Local { .. })
443 }
444
445 pub fn shared(&mut self, project_id: u64, client: AnyProtoClient, cx: &mut Context<Self>) {
446 match &mut self.state {
447 GitStoreState::Ssh {
448 downstream: downstream_client,
449 ..
450 } => {
451 for repo in self.repositories.values() {
452 let update = repo.read(cx).snapshot.initial_update(project_id);
453 for update in split_repository_update(update) {
454 client.send(update).log_err();
455 }
456 }
457 *downstream_client = Some((client, ProjectId(project_id)));
458 }
459 GitStoreState::Local {
460 downstream: downstream_client,
461 ..
462 } => {
463 let mut snapshots = HashMap::default();
464 let (updates_tx, mut updates_rx) = mpsc::unbounded();
465 for repo in self.repositories.values() {
466 updates_tx
467 .unbounded_send(DownstreamUpdate::UpdateRepository(
468 repo.read(cx).snapshot.clone(),
469 ))
470 .ok();
471 }
472 *downstream_client = Some(LocalDownstreamState {
473 client: client.clone(),
474 project_id: ProjectId(project_id),
475 updates_tx,
476 _task: cx.spawn(async move |this, cx| {
477 cx.background_spawn(async move {
478 while let Some(update) = updates_rx.next().await {
479 match update {
480 DownstreamUpdate::UpdateRepository(snapshot) => {
481 if let Some(old_snapshot) = snapshots.get_mut(&snapshot.id)
482 {
483 let update =
484 snapshot.build_update(old_snapshot, project_id);
485 *old_snapshot = snapshot;
486 for update in split_repository_update(update) {
487 client.send(update)?;
488 }
489 } else {
490 let update = snapshot.initial_update(project_id);
491 for update in split_repository_update(update) {
492 client.send(update)?;
493 }
494 snapshots.insert(snapshot.id, snapshot);
495 }
496 }
497 DownstreamUpdate::RemoveRepository(id) => {
498 client.send(proto::RemoveRepository {
499 project_id,
500 id: id.to_proto(),
501 })?;
502 }
503 }
504 }
505 anyhow::Ok(())
506 })
507 .await
508 .ok();
509 this.update(cx, |this, _| {
510 if let GitStoreState::Local {
511 downstream: downstream_client,
512 ..
513 } = &mut this.state
514 {
515 downstream_client.take();
516 } else {
517 unreachable!("unshared called on remote store");
518 }
519 })
520 }),
521 });
522 }
523 GitStoreState::Remote { .. } => {
524 debug_panic!("shared called on remote store");
525 }
526 }
527 }
528
529 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
530 match &mut self.state {
531 GitStoreState::Local {
532 downstream: downstream_client,
533 ..
534 } => {
535 downstream_client.take();
536 }
537 GitStoreState::Ssh {
538 downstream: downstream_client,
539 ..
540 } => {
541 downstream_client.take();
542 }
543 GitStoreState::Remote { .. } => {
544 debug_panic!("unshared called on remote store");
545 }
546 }
547 self.shared_diffs.clear();
548 }
549
550 pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) {
551 self.shared_diffs.remove(peer_id);
552 }
553
554 pub fn active_repository(&self) -> Option<Entity<Repository>> {
555 self.active_repo_id
556 .as_ref()
557 .map(|id| self.repositories[&id].clone())
558 }
559
560 pub fn open_unstaged_diff(
561 &mut self,
562 buffer: Entity<Buffer>,
563 cx: &mut Context<Self>,
564 ) -> Task<Result<Entity<BufferDiff>>> {
565 let buffer_id = buffer.read(cx).remote_id();
566 if let Some(diff_state) = self.diffs.get(&buffer_id) {
567 if let Some(unstaged_diff) = diff_state
568 .read(cx)
569 .unstaged_diff
570 .as_ref()
571 .and_then(|weak| weak.upgrade())
572 {
573 if let Some(task) =
574 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
575 {
576 return cx.background_executor().spawn(async move {
577 task.await;
578 Ok(unstaged_diff)
579 });
580 }
581 return Task::ready(Ok(unstaged_diff));
582 }
583 }
584
585 let Some((repo, repo_path)) =
586 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
587 else {
588 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
589 };
590
591 let task = self
592 .loading_diffs
593 .entry((buffer_id, DiffKind::Unstaged))
594 .or_insert_with(|| {
595 let staged_text = repo.update(cx, |repo, cx| {
596 repo.load_staged_text(buffer_id, repo_path, cx)
597 });
598 cx.spawn(async move |this, cx| {
599 Self::open_diff_internal(
600 this,
601 DiffKind::Unstaged,
602 staged_text.await.map(DiffBasesChange::SetIndex),
603 buffer,
604 cx,
605 )
606 .await
607 .map_err(Arc::new)
608 })
609 .shared()
610 })
611 .clone();
612
613 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
614 }
615
616 pub fn open_uncommitted_diff(
617 &mut self,
618 buffer: Entity<Buffer>,
619 cx: &mut Context<Self>,
620 ) -> Task<Result<Entity<BufferDiff>>> {
621 let buffer_id = buffer.read(cx).remote_id();
622
623 if let Some(diff_state) = self.diffs.get(&buffer_id) {
624 if let Some(uncommitted_diff) = diff_state
625 .read(cx)
626 .uncommitted_diff
627 .as_ref()
628 .and_then(|weak| weak.upgrade())
629 {
630 if let Some(task) =
631 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
632 {
633 return cx.background_executor().spawn(async move {
634 task.await;
635 Ok(uncommitted_diff)
636 });
637 }
638 return Task::ready(Ok(uncommitted_diff));
639 }
640 }
641
642 let Some((repo, repo_path)) =
643 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
644 else {
645 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
646 };
647
648 let task = self
649 .loading_diffs
650 .entry((buffer_id, DiffKind::Uncommitted))
651 .or_insert_with(|| {
652 let changes = repo.update(cx, |repo, cx| {
653 repo.load_committed_text(buffer_id, repo_path, cx)
654 });
655
656 cx.spawn(async move |this, cx| {
657 Self::open_diff_internal(this, DiffKind::Uncommitted, changes.await, buffer, cx)
658 .await
659 .map_err(Arc::new)
660 })
661 .shared()
662 })
663 .clone();
664
665 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
666 }
667
668 async fn open_diff_internal(
669 this: WeakEntity<Self>,
670 kind: DiffKind,
671 texts: Result<DiffBasesChange>,
672 buffer_entity: Entity<Buffer>,
673 cx: &mut AsyncApp,
674 ) -> Result<Entity<BufferDiff>> {
675 let diff_bases_change = match texts {
676 Err(e) => {
677 this.update(cx, |this, cx| {
678 let buffer = buffer_entity.read(cx);
679 let buffer_id = buffer.remote_id();
680 this.loading_diffs.remove(&(buffer_id, kind));
681 })?;
682 return Err(e);
683 }
684 Ok(change) => change,
685 };
686
687 this.update(cx, |this, cx| {
688 let buffer = buffer_entity.read(cx);
689 let buffer_id = buffer.remote_id();
690 let language = buffer.language().cloned();
691 let language_registry = buffer.language_registry();
692 let text_snapshot = buffer.text_snapshot();
693 this.loading_diffs.remove(&(buffer_id, kind));
694
695 let git_store = cx.weak_entity();
696 let diff_state = this
697 .diffs
698 .entry(buffer_id)
699 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
700
701 let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
702
703 cx.subscribe(&diff, Self::on_buffer_diff_event).detach();
704 diff_state.update(cx, |diff_state, cx| {
705 diff_state.language = language;
706 diff_state.language_registry = language_registry;
707
708 match kind {
709 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
710 DiffKind::Uncommitted => {
711 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
712 diff
713 } else {
714 let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
715 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
716 unstaged_diff
717 };
718
719 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
720 diff_state.uncommitted_diff = Some(diff.downgrade())
721 }
722 }
723
724 diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
725 let rx = diff_state.wait_for_recalculation();
726
727 anyhow::Ok(async move {
728 if let Some(rx) = rx {
729 rx.await;
730 }
731 Ok(diff)
732 })
733 })
734 })??
735 .await
736 }
737
738 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
739 let diff_state = self.diffs.get(&buffer_id)?;
740 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
741 }
742
743 pub fn get_uncommitted_diff(
744 &self,
745 buffer_id: BufferId,
746 cx: &App,
747 ) -> Option<Entity<BufferDiff>> {
748 let diff_state = self.diffs.get(&buffer_id)?;
749 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
750 }
751
752 pub fn open_conflict_set(
753 &mut self,
754 buffer: Entity<Buffer>,
755 cx: &mut Context<Self>,
756 ) -> Entity<ConflictSet> {
757 log::debug!("open conflict set");
758 let buffer_id = buffer.read(cx).remote_id();
759
760 if let Some(git_state) = self.diffs.get(&buffer_id) {
761 if let Some(conflict_set) = git_state
762 .read(cx)
763 .conflict_set
764 .as_ref()
765 .and_then(|weak| weak.upgrade())
766 {
767 let conflict_set = conflict_set.clone();
768 let buffer_snapshot = buffer.read(cx).text_snapshot();
769
770 git_state.update(cx, |state, cx| {
771 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
772 });
773
774 return conflict_set;
775 }
776 }
777
778 let is_unmerged = self
779 .repository_and_path_for_buffer_id(buffer_id, cx)
780 .map_or(false, |(repo, path)| {
781 repo.read(cx).snapshot.has_conflict(&path)
782 });
783 let git_store = cx.weak_entity();
784 let buffer_git_state = self
785 .diffs
786 .entry(buffer_id)
787 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
788 let conflict_set = cx.new(|cx| ConflictSet::new(buffer_id, is_unmerged, cx));
789
790 self._subscriptions
791 .push(cx.subscribe(&conflict_set, |_, _, _, cx| {
792 cx.emit(GitStoreEvent::ConflictsUpdated);
793 }));
794
795 buffer_git_state.update(cx, |state, cx| {
796 state.conflict_set = Some(conflict_set.downgrade());
797 let buffer_snapshot = buffer.read(cx).text_snapshot();
798 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
799 });
800
801 conflict_set
802 }
803
804 pub fn project_path_git_status(
805 &self,
806 project_path: &ProjectPath,
807 cx: &App,
808 ) -> Option<FileStatus> {
809 let (repo, repo_path) = self.repository_and_path_for_project_path(project_path, cx)?;
810 Some(repo.read(cx).status_for_path(&repo_path)?.status)
811 }
812
813 pub fn checkpoint(&self, cx: &mut App) -> Task<Result<GitStoreCheckpoint>> {
814 let mut work_directory_abs_paths = Vec::new();
815 let mut checkpoints = Vec::new();
816 for repository in self.repositories.values() {
817 repository.update(cx, |repository, _| {
818 work_directory_abs_paths.push(repository.snapshot.work_directory_abs_path.clone());
819 checkpoints.push(repository.checkpoint().map(|checkpoint| checkpoint?));
820 });
821 }
822
823 cx.background_executor().spawn(async move {
824 let checkpoints = future::try_join_all(checkpoints).await?;
825 Ok(GitStoreCheckpoint {
826 checkpoints_by_work_dir_abs_path: work_directory_abs_paths
827 .into_iter()
828 .zip(checkpoints)
829 .collect(),
830 })
831 })
832 }
833
834 pub fn restore_checkpoint(
835 &self,
836 checkpoint: GitStoreCheckpoint,
837 cx: &mut App,
838 ) -> Task<Result<()>> {
839 let repositories_by_work_dir_abs_path = self
840 .repositories
841 .values()
842 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
843 .collect::<HashMap<_, _>>();
844
845 let mut tasks = Vec::new();
846 for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
847 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path) {
848 let restore = repository.update(cx, |repository, _| {
849 repository.restore_checkpoint(checkpoint)
850 });
851 tasks.push(async move { restore.await? });
852 }
853 }
854 cx.background_spawn(async move {
855 future::try_join_all(tasks).await?;
856 Ok(())
857 })
858 }
859
860 /// Compares two checkpoints, returning true if they are equal.
861 pub fn compare_checkpoints(
862 &self,
863 left: GitStoreCheckpoint,
864 mut right: GitStoreCheckpoint,
865 cx: &mut App,
866 ) -> Task<Result<bool>> {
867 let repositories_by_work_dir_abs_path = self
868 .repositories
869 .values()
870 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
871 .collect::<HashMap<_, _>>();
872
873 let mut tasks = Vec::new();
874 for (work_dir_abs_path, left_checkpoint) in left.checkpoints_by_work_dir_abs_path {
875 if let Some(right_checkpoint) = right
876 .checkpoints_by_work_dir_abs_path
877 .remove(&work_dir_abs_path)
878 {
879 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path)
880 {
881 let compare = repository.update(cx, |repository, _| {
882 repository.compare_checkpoints(left_checkpoint, right_checkpoint)
883 });
884
885 tasks.push(async move { compare.await? });
886 }
887 } else {
888 return Task::ready(Ok(false));
889 }
890 }
891 cx.background_spawn(async move {
892 Ok(future::try_join_all(tasks)
893 .await?
894 .into_iter()
895 .all(|result| result))
896 })
897 }
898
899 /// Blames a buffer.
900 pub fn blame_buffer(
901 &self,
902 buffer: &Entity<Buffer>,
903 version: Option<clock::Global>,
904 cx: &mut App,
905 ) -> Task<Result<Option<Blame>>> {
906 let buffer = buffer.read(cx);
907 let Some((repo, repo_path)) =
908 self.repository_and_path_for_buffer_id(buffer.remote_id(), cx)
909 else {
910 return Task::ready(Err(anyhow!("failed to find a git repository for buffer")));
911 };
912 let content = match &version {
913 Some(version) => buffer.rope_for_version(version).clone(),
914 None => buffer.as_rope().clone(),
915 };
916 let version = version.unwrap_or(buffer.version());
917 let buffer_id = buffer.remote_id();
918
919 let rx = repo.update(cx, |repo, _| {
920 repo.send_job(None, move |state, _| async move {
921 match state {
922 RepositoryState::Local { backend, .. } => backend
923 .blame(repo_path.clone(), content)
924 .await
925 .with_context(|| format!("Failed to blame {:?}", repo_path.0))
926 .map(Some),
927 RepositoryState::Remote { project_id, client } => {
928 let response = client
929 .request(proto::BlameBuffer {
930 project_id: project_id.to_proto(),
931 buffer_id: buffer_id.into(),
932 version: serialize_version(&version),
933 })
934 .await?;
935 Ok(deserialize_blame_buffer_response(response))
936 }
937 }
938 })
939 });
940
941 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
942 }
943
944 pub fn get_permalink_to_line(
945 &self,
946 buffer: &Entity<Buffer>,
947 selection: Range<u32>,
948 cx: &mut App,
949 ) -> Task<Result<url::Url>> {
950 let (worktree, path) = {
951 let buffer_ref = buffer.read(cx);
952 let Some(file) = File::from_dyn(buffer_ref.file()) else {
953 return Task::ready(Err(anyhow!("buffer has no file")));
954 };
955 (file.worktree.clone(), file.path.clone())
956 };
957
958 let worktree_id = worktree.read(cx).id();
959 let Some((repo, repo_path)) =
960 self.repository_and_path_for_project_path(&(worktree_id, path.clone()).into(), cx)
961 else {
962 // If we're not in a Git repo, check whether this is a Rust source
963 // file in the Cargo registry (presumably opened with go-to-definition
964 // from a normal Rust file). If so, we can put together a permalink
965 // using crate metadata.
966 if buffer
967 .read(cx)
968 .language()
969 .is_none_or(|lang| lang.name() != "Rust".into())
970 {
971 return Task::ready(Err(anyhow!("no permalink available")));
972 }
973 let Some(file_path) = worktree.read(cx).absolutize(&path).ok() else {
974 return Task::ready(Err(anyhow!("no permalink available")));
975 };
976 return cx.spawn(async move |cx| {
977 let provider_registry = cx.update(GitHostingProviderRegistry::default_global)?;
978 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
979 .context("no permalink available")
980 });
981
982 // TODO remote case
983 };
984
985 let buffer_id = buffer.read(cx).remote_id();
986 let branch = repo.read(cx).branch.clone();
987 let remote = branch
988 .as_ref()
989 .and_then(|b| b.upstream.as_ref())
990 .and_then(|b| b.remote_name())
991 .unwrap_or("origin")
992 .to_string();
993
994 let rx = repo.update(cx, |repo, _| {
995 repo.send_job(None, move |state, cx| async move {
996 match state {
997 RepositoryState::Local { backend, .. } => {
998 let origin_url = backend
999 .remote_url(&remote)
1000 .with_context(|| format!("remote \"{remote}\" not found"))?;
1001
1002 let sha = backend.head_sha().await.context("reading HEAD SHA")?;
1003
1004 let provider_registry =
1005 cx.update(GitHostingProviderRegistry::default_global)?;
1006
1007 let (provider, remote) =
1008 parse_git_remote_url(provider_registry, &origin_url)
1009 .context("parsing Git remote URL")?;
1010
1011 let path = repo_path.to_str().with_context(|| {
1012 format!("converting repo path {repo_path:?} to string")
1013 })?;
1014
1015 Ok(provider.build_permalink(
1016 remote,
1017 BuildPermalinkParams {
1018 sha: &sha,
1019 path,
1020 selection: Some(selection),
1021 },
1022 ))
1023 }
1024 RepositoryState::Remote { project_id, client } => {
1025 let response = client
1026 .request(proto::GetPermalinkToLine {
1027 project_id: project_id.to_proto(),
1028 buffer_id: buffer_id.into(),
1029 selection: Some(proto::Range {
1030 start: selection.start as u64,
1031 end: selection.end as u64,
1032 }),
1033 })
1034 .await?;
1035
1036 url::Url::parse(&response.permalink).context("failed to parse permalink")
1037 }
1038 }
1039 })
1040 });
1041 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1042 }
1043
1044 fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> {
1045 match &self.state {
1046 GitStoreState::Local {
1047 downstream: downstream_client,
1048 ..
1049 } => downstream_client
1050 .as_ref()
1051 .map(|state| (state.client.clone(), state.project_id)),
1052 GitStoreState::Ssh {
1053 downstream: downstream_client,
1054 ..
1055 } => downstream_client.clone(),
1056 GitStoreState::Remote { .. } => None,
1057 }
1058 }
1059
1060 fn upstream_client(&self) -> Option<AnyProtoClient> {
1061 match &self.state {
1062 GitStoreState::Local { .. } => None,
1063 GitStoreState::Ssh {
1064 upstream_client, ..
1065 }
1066 | GitStoreState::Remote {
1067 upstream_client, ..
1068 } => Some(upstream_client.clone()),
1069 }
1070 }
1071
1072 fn on_worktree_store_event(
1073 &mut self,
1074 worktree_store: Entity<WorktreeStore>,
1075 event: &WorktreeStoreEvent,
1076 cx: &mut Context<Self>,
1077 ) {
1078 let GitStoreState::Local {
1079 project_environment,
1080 downstream,
1081 next_repository_id,
1082 fs,
1083 } = &self.state
1084 else {
1085 return;
1086 };
1087
1088 match event {
1089 WorktreeStoreEvent::WorktreeUpdatedEntries(worktree_id, updated_entries) => {
1090 let mut paths_by_git_repo = HashMap::<_, Vec<_>>::default();
1091 for (relative_path, _, _) in updated_entries.iter() {
1092 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
1093 &(*worktree_id, relative_path.clone()).into(),
1094 cx,
1095 ) else {
1096 continue;
1097 };
1098 paths_by_git_repo.entry(repo).or_default().push(repo_path)
1099 }
1100
1101 for (repo, paths) in paths_by_git_repo {
1102 repo.update(cx, |repo, cx| {
1103 repo.paths_changed(
1104 paths,
1105 downstream
1106 .as_ref()
1107 .map(|downstream| downstream.updates_tx.clone()),
1108 cx,
1109 );
1110 });
1111 }
1112 }
1113 WorktreeStoreEvent::WorktreeUpdatedGitRepositories(worktree_id, changed_repos) => {
1114 let Some(worktree) = worktree_store.read(cx).worktree_for_id(*worktree_id, cx)
1115 else {
1116 return;
1117 };
1118 if !worktree.read(cx).is_visible() {
1119 log::debug!(
1120 "not adding repositories for local worktree {:?} because it's not visible",
1121 worktree.read(cx).abs_path()
1122 );
1123 return;
1124 }
1125 self.update_repositories_from_worktree(
1126 project_environment.clone(),
1127 next_repository_id.clone(),
1128 downstream
1129 .as_ref()
1130 .map(|downstream| downstream.updates_tx.clone()),
1131 changed_repos.clone(),
1132 fs.clone(),
1133 cx,
1134 );
1135 self.local_worktree_git_repos_changed(worktree, changed_repos, cx);
1136 }
1137 _ => {}
1138 }
1139 }
1140
1141 fn on_repository_event(
1142 &mut self,
1143 repo: Entity<Repository>,
1144 event: &RepositoryEvent,
1145 cx: &mut Context<Self>,
1146 ) {
1147 let id = repo.read(cx).id;
1148 let repo_snapshot = repo.read(cx).snapshot.clone();
1149 for (buffer_id, diff) in self.diffs.iter() {
1150 if let Some((buffer_repo, repo_path)) =
1151 self.repository_and_path_for_buffer_id(*buffer_id, cx)
1152 {
1153 if buffer_repo == repo {
1154 diff.update(cx, |diff, cx| {
1155 if let Some(conflict_set) = &diff.conflict_set {
1156 let conflict_status_changed =
1157 conflict_set.update(cx, |conflict_set, cx| {
1158 let has_conflict = repo_snapshot.has_conflict(&repo_path);
1159 conflict_set.set_has_conflict(has_conflict, cx)
1160 })?;
1161 if conflict_status_changed {
1162 let buffer_store = self.buffer_store.read(cx);
1163 if let Some(buffer) = buffer_store.get(*buffer_id) {
1164 let _ = diff.reparse_conflict_markers(
1165 buffer.read(cx).text_snapshot(),
1166 cx,
1167 );
1168 }
1169 }
1170 }
1171 anyhow::Ok(())
1172 })
1173 .ok();
1174 }
1175 }
1176 }
1177 cx.emit(GitStoreEvent::RepositoryUpdated(
1178 id,
1179 event.clone(),
1180 self.active_repo_id == Some(id),
1181 ))
1182 }
1183
1184 fn on_jobs_updated(&mut self, _: Entity<Repository>, _: &JobsUpdated, cx: &mut Context<Self>) {
1185 cx.emit(GitStoreEvent::JobsUpdated)
1186 }
1187
1188 /// Update our list of repositories and schedule git scans in response to a notification from a worktree,
1189 fn update_repositories_from_worktree(
1190 &mut self,
1191 project_environment: Entity<ProjectEnvironment>,
1192 next_repository_id: Arc<AtomicU64>,
1193 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
1194 updated_git_repositories: UpdatedGitRepositoriesSet,
1195 fs: Arc<dyn Fs>,
1196 cx: &mut Context<Self>,
1197 ) {
1198 let mut removed_ids = Vec::new();
1199 for update in updated_git_repositories.iter() {
1200 if let Some((id, existing)) = self.repositories.iter().find(|(_, repo)| {
1201 let existing_work_directory_abs_path =
1202 repo.read(cx).work_directory_abs_path.clone();
1203 Some(&existing_work_directory_abs_path)
1204 == update.old_work_directory_abs_path.as_ref()
1205 || Some(&existing_work_directory_abs_path)
1206 == update.new_work_directory_abs_path.as_ref()
1207 }) {
1208 if let Some(new_work_directory_abs_path) =
1209 update.new_work_directory_abs_path.clone()
1210 {
1211 existing.update(cx, |existing, cx| {
1212 existing.snapshot.work_directory_abs_path = new_work_directory_abs_path;
1213 existing.schedule_scan(updates_tx.clone(), cx);
1214 });
1215 } else {
1216 removed_ids.push(*id);
1217 }
1218 } else if let UpdatedGitRepository {
1219 new_work_directory_abs_path: Some(work_directory_abs_path),
1220 dot_git_abs_path: Some(dot_git_abs_path),
1221 repository_dir_abs_path: Some(repository_dir_abs_path),
1222 common_dir_abs_path: Some(common_dir_abs_path),
1223 ..
1224 } = update
1225 {
1226 let id = RepositoryId(next_repository_id.fetch_add(1, atomic::Ordering::Release));
1227 let git_store = cx.weak_entity();
1228 let repo = cx.new(|cx| {
1229 let mut repo = Repository::local(
1230 id,
1231 work_directory_abs_path.clone(),
1232 dot_git_abs_path.clone(),
1233 repository_dir_abs_path.clone(),
1234 common_dir_abs_path.clone(),
1235 project_environment.downgrade(),
1236 fs.clone(),
1237 git_store,
1238 cx,
1239 );
1240 repo.schedule_scan(updates_tx.clone(), cx);
1241 repo
1242 });
1243 self._subscriptions
1244 .push(cx.subscribe(&repo, Self::on_repository_event));
1245 self._subscriptions
1246 .push(cx.subscribe(&repo, Self::on_jobs_updated));
1247 self.repositories.insert(id, repo);
1248 cx.emit(GitStoreEvent::RepositoryAdded(id));
1249 self.active_repo_id.get_or_insert_with(|| {
1250 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1251 id
1252 });
1253 }
1254 }
1255
1256 for id in removed_ids {
1257 if self.active_repo_id == Some(id) {
1258 self.active_repo_id = None;
1259 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1260 }
1261 self.repositories.remove(&id);
1262 if let Some(updates_tx) = updates_tx.as_ref() {
1263 updates_tx
1264 .unbounded_send(DownstreamUpdate::RemoveRepository(id))
1265 .ok();
1266 }
1267 }
1268 }
1269
1270 fn on_buffer_store_event(
1271 &mut self,
1272 _: Entity<BufferStore>,
1273 event: &BufferStoreEvent,
1274 cx: &mut Context<Self>,
1275 ) {
1276 match event {
1277 BufferStoreEvent::BufferAdded(buffer) => {
1278 cx.subscribe(&buffer, |this, buffer, event, cx| {
1279 if let BufferEvent::LanguageChanged = event {
1280 let buffer_id = buffer.read(cx).remote_id();
1281 if let Some(diff_state) = this.diffs.get(&buffer_id) {
1282 diff_state.update(cx, |diff_state, cx| {
1283 diff_state.buffer_language_changed(buffer, cx);
1284 });
1285 }
1286 }
1287 })
1288 .detach();
1289 }
1290 BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => {
1291 if let Some(diffs) = self.shared_diffs.get_mut(peer_id) {
1292 diffs.remove(buffer_id);
1293 }
1294 }
1295 BufferStoreEvent::BufferDropped(buffer_id) => {
1296 self.diffs.remove(&buffer_id);
1297 for diffs in self.shared_diffs.values_mut() {
1298 diffs.remove(buffer_id);
1299 }
1300 }
1301
1302 _ => {}
1303 }
1304 }
1305
1306 pub fn recalculate_buffer_diffs(
1307 &mut self,
1308 buffers: Vec<Entity<Buffer>>,
1309 cx: &mut Context<Self>,
1310 ) -> impl Future<Output = ()> + use<> {
1311 let mut futures = Vec::new();
1312 for buffer in buffers {
1313 if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
1314 let buffer = buffer.read(cx).text_snapshot();
1315 diff_state.update(cx, |diff_state, cx| {
1316 diff_state.recalculate_diffs(buffer.clone(), cx);
1317 futures.extend(diff_state.wait_for_recalculation().map(FutureExt::boxed));
1318 });
1319 futures.push(diff_state.update(cx, |diff_state, cx| {
1320 diff_state
1321 .reparse_conflict_markers(buffer, cx)
1322 .map(|_| {})
1323 .boxed()
1324 }));
1325 }
1326 }
1327 async move {
1328 futures::future::join_all(futures).await;
1329 }
1330 }
1331
1332 fn on_buffer_diff_event(
1333 &mut self,
1334 diff: Entity<buffer_diff::BufferDiff>,
1335 event: &BufferDiffEvent,
1336 cx: &mut Context<Self>,
1337 ) {
1338 if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
1339 let buffer_id = diff.read(cx).buffer_id;
1340 if let Some(diff_state) = self.diffs.get(&buffer_id) {
1341 let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
1342 diff_state.hunk_staging_operation_count += 1;
1343 diff_state.hunk_staging_operation_count
1344 });
1345 if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
1346 let recv = repo.update(cx, |repo, cx| {
1347 log::debug!("hunks changed for {}", path.display());
1348 repo.spawn_set_index_text_job(
1349 path,
1350 new_index_text.as_ref().map(|rope| rope.to_string()),
1351 Some(hunk_staging_operation_count),
1352 cx,
1353 )
1354 });
1355 let diff = diff.downgrade();
1356 cx.spawn(async move |this, cx| {
1357 if let Ok(Err(error)) = cx.background_spawn(recv).await {
1358 diff.update(cx, |diff, cx| {
1359 diff.clear_pending_hunks(cx);
1360 })
1361 .ok();
1362 this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
1363 .ok();
1364 }
1365 })
1366 .detach();
1367 }
1368 }
1369 }
1370 }
1371
1372 fn local_worktree_git_repos_changed(
1373 &mut self,
1374 worktree: Entity<Worktree>,
1375 changed_repos: &UpdatedGitRepositoriesSet,
1376 cx: &mut Context<Self>,
1377 ) {
1378 log::debug!("local worktree repos changed");
1379 debug_assert!(worktree.read(cx).is_local());
1380
1381 for repository in self.repositories.values() {
1382 repository.update(cx, |repository, cx| {
1383 let repo_abs_path = &repository.work_directory_abs_path;
1384 if changed_repos.iter().any(|update| {
1385 update.old_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1386 || update.new_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1387 }) {
1388 repository.reload_buffer_diff_bases(cx);
1389 }
1390 });
1391 }
1392 }
1393
1394 pub fn repositories(&self) -> &HashMap<RepositoryId, Entity<Repository>> {
1395 &self.repositories
1396 }
1397
1398 pub fn status_for_buffer_id(&self, buffer_id: BufferId, cx: &App) -> Option<FileStatus> {
1399 let (repo, path) = self.repository_and_path_for_buffer_id(buffer_id, cx)?;
1400 let status = repo.read(cx).snapshot.status_for_path(&path)?;
1401 Some(status.status)
1402 }
1403
1404 pub fn repository_and_path_for_buffer_id(
1405 &self,
1406 buffer_id: BufferId,
1407 cx: &App,
1408 ) -> Option<(Entity<Repository>, RepoPath)> {
1409 let buffer = self.buffer_store.read(cx).get(buffer_id)?;
1410 let project_path = buffer.read(cx).project_path(cx)?;
1411 self.repository_and_path_for_project_path(&project_path, cx)
1412 }
1413
1414 pub fn repository_and_path_for_project_path(
1415 &self,
1416 path: &ProjectPath,
1417 cx: &App,
1418 ) -> Option<(Entity<Repository>, RepoPath)> {
1419 let abs_path = self.worktree_store.read(cx).absolutize(path, cx)?;
1420 self.repositories
1421 .values()
1422 .filter_map(|repo| {
1423 let repo_path = repo.read(cx).abs_path_to_repo_path(&abs_path)?;
1424 Some((repo.clone(), repo_path))
1425 })
1426 .max_by_key(|(repo, _)| repo.read(cx).work_directory_abs_path.clone())
1427 }
1428
1429 pub fn git_init(
1430 &self,
1431 path: Arc<Path>,
1432 fallback_branch_name: String,
1433 cx: &App,
1434 ) -> Task<Result<()>> {
1435 match &self.state {
1436 GitStoreState::Local { fs, .. } => {
1437 let fs = fs.clone();
1438 cx.background_executor()
1439 .spawn(async move { fs.git_init(&path, fallback_branch_name) })
1440 }
1441 GitStoreState::Ssh {
1442 upstream_client,
1443 upstream_project_id: project_id,
1444 ..
1445 }
1446 | GitStoreState::Remote {
1447 upstream_client,
1448 upstream_project_id: project_id,
1449 ..
1450 } => {
1451 let client = upstream_client.clone();
1452 let project_id = *project_id;
1453 cx.background_executor().spawn(async move {
1454 client
1455 .request(proto::GitInit {
1456 project_id: project_id.0,
1457 abs_path: path.to_string_lossy().to_string(),
1458 fallback_branch_name,
1459 })
1460 .await?;
1461 Ok(())
1462 })
1463 }
1464 }
1465 }
1466
1467 async fn handle_update_repository(
1468 this: Entity<Self>,
1469 envelope: TypedEnvelope<proto::UpdateRepository>,
1470 mut cx: AsyncApp,
1471 ) -> Result<()> {
1472 this.update(&mut cx, |this, cx| {
1473 let mut update = envelope.payload;
1474
1475 let id = RepositoryId::from_proto(update.id);
1476 let client = this
1477 .upstream_client()
1478 .context("no upstream client")?
1479 .clone();
1480
1481 let mut is_new = false;
1482 let repo = this.repositories.entry(id).or_insert_with(|| {
1483 is_new = true;
1484 let git_store = cx.weak_entity();
1485 cx.new(|cx| {
1486 Repository::remote(
1487 id,
1488 Path::new(&update.abs_path).into(),
1489 ProjectId(update.project_id),
1490 client,
1491 git_store,
1492 cx,
1493 )
1494 })
1495 });
1496 if is_new {
1497 this._subscriptions
1498 .push(cx.subscribe(&repo, Self::on_repository_event))
1499 }
1500
1501 repo.update(cx, {
1502 let update = update.clone();
1503 |repo, cx| repo.apply_remote_update(update, is_new, cx)
1504 })?;
1505
1506 this.active_repo_id.get_or_insert_with(|| {
1507 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1508 id
1509 });
1510
1511 if let Some((client, project_id)) = this.downstream_client() {
1512 update.project_id = project_id.to_proto();
1513 client.send(update).log_err();
1514 }
1515 Ok(())
1516 })?
1517 }
1518
1519 async fn handle_remove_repository(
1520 this: Entity<Self>,
1521 envelope: TypedEnvelope<proto::RemoveRepository>,
1522 mut cx: AsyncApp,
1523 ) -> Result<()> {
1524 this.update(&mut cx, |this, cx| {
1525 let mut update = envelope.payload;
1526 let id = RepositoryId::from_proto(update.id);
1527 this.repositories.remove(&id);
1528 if let Some((client, project_id)) = this.downstream_client() {
1529 update.project_id = project_id.to_proto();
1530 client.send(update).log_err();
1531 }
1532 if this.active_repo_id == Some(id) {
1533 this.active_repo_id = None;
1534 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1535 }
1536 cx.emit(GitStoreEvent::RepositoryRemoved(id));
1537 })
1538 }
1539
1540 async fn handle_git_init(
1541 this: Entity<Self>,
1542 envelope: TypedEnvelope<proto::GitInit>,
1543 cx: AsyncApp,
1544 ) -> Result<proto::Ack> {
1545 let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1546 let name = envelope.payload.fallback_branch_name;
1547 cx.update(|cx| this.read(cx).git_init(path, name, cx))?
1548 .await?;
1549
1550 Ok(proto::Ack {})
1551 }
1552
1553 async fn handle_fetch(
1554 this: Entity<Self>,
1555 envelope: TypedEnvelope<proto::Fetch>,
1556 mut cx: AsyncApp,
1557 ) -> Result<proto::RemoteMessageResponse> {
1558 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1559 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1560 let fetch_options = FetchOptions::from_proto(envelope.payload.remote);
1561 let askpass_id = envelope.payload.askpass_id;
1562
1563 let askpass = make_remote_delegate(
1564 this,
1565 envelope.payload.project_id,
1566 repository_id,
1567 askpass_id,
1568 &mut cx,
1569 );
1570
1571 let remote_output = repository_handle
1572 .update(&mut cx, |repository_handle, cx| {
1573 repository_handle.fetch(fetch_options, askpass, cx)
1574 })?
1575 .await??;
1576
1577 Ok(proto::RemoteMessageResponse {
1578 stdout: remote_output.stdout,
1579 stderr: remote_output.stderr,
1580 })
1581 }
1582
1583 async fn handle_push(
1584 this: Entity<Self>,
1585 envelope: TypedEnvelope<proto::Push>,
1586 mut cx: AsyncApp,
1587 ) -> Result<proto::RemoteMessageResponse> {
1588 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1589 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1590
1591 let askpass_id = envelope.payload.askpass_id;
1592 let askpass = make_remote_delegate(
1593 this,
1594 envelope.payload.project_id,
1595 repository_id,
1596 askpass_id,
1597 &mut cx,
1598 );
1599
1600 let options = envelope
1601 .payload
1602 .options
1603 .as_ref()
1604 .map(|_| match envelope.payload.options() {
1605 proto::push::PushOptions::SetUpstream => git::repository::PushOptions::SetUpstream,
1606 proto::push::PushOptions::Force => git::repository::PushOptions::Force,
1607 });
1608
1609 let branch_name = envelope.payload.branch_name.into();
1610 let remote_name = envelope.payload.remote_name.into();
1611
1612 let remote_output = repository_handle
1613 .update(&mut cx, |repository_handle, cx| {
1614 repository_handle.push(branch_name, remote_name, options, askpass, cx)
1615 })?
1616 .await??;
1617 Ok(proto::RemoteMessageResponse {
1618 stdout: remote_output.stdout,
1619 stderr: remote_output.stderr,
1620 })
1621 }
1622
1623 async fn handle_pull(
1624 this: Entity<Self>,
1625 envelope: TypedEnvelope<proto::Pull>,
1626 mut cx: AsyncApp,
1627 ) -> Result<proto::RemoteMessageResponse> {
1628 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1629 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1630 let askpass_id = envelope.payload.askpass_id;
1631 let askpass = make_remote_delegate(
1632 this,
1633 envelope.payload.project_id,
1634 repository_id,
1635 askpass_id,
1636 &mut cx,
1637 );
1638
1639 let branch_name = envelope.payload.branch_name.into();
1640 let remote_name = envelope.payload.remote_name.into();
1641
1642 let remote_message = repository_handle
1643 .update(&mut cx, |repository_handle, cx| {
1644 repository_handle.pull(branch_name, remote_name, askpass, cx)
1645 })?
1646 .await??;
1647
1648 Ok(proto::RemoteMessageResponse {
1649 stdout: remote_message.stdout,
1650 stderr: remote_message.stderr,
1651 })
1652 }
1653
1654 async fn handle_stage(
1655 this: Entity<Self>,
1656 envelope: TypedEnvelope<proto::Stage>,
1657 mut cx: AsyncApp,
1658 ) -> Result<proto::Ack> {
1659 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1660 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1661
1662 let entries = envelope
1663 .payload
1664 .paths
1665 .into_iter()
1666 .map(PathBuf::from)
1667 .map(RepoPath::new)
1668 .collect();
1669
1670 repository_handle
1671 .update(&mut cx, |repository_handle, cx| {
1672 repository_handle.stage_entries(entries, cx)
1673 })?
1674 .await?;
1675 Ok(proto::Ack {})
1676 }
1677
1678 async fn handle_unstage(
1679 this: Entity<Self>,
1680 envelope: TypedEnvelope<proto::Unstage>,
1681 mut cx: AsyncApp,
1682 ) -> Result<proto::Ack> {
1683 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1684 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1685
1686 let entries = envelope
1687 .payload
1688 .paths
1689 .into_iter()
1690 .map(PathBuf::from)
1691 .map(RepoPath::new)
1692 .collect();
1693
1694 repository_handle
1695 .update(&mut cx, |repository_handle, cx| {
1696 repository_handle.unstage_entries(entries, cx)
1697 })?
1698 .await?;
1699
1700 Ok(proto::Ack {})
1701 }
1702
1703 async fn handle_set_index_text(
1704 this: Entity<Self>,
1705 envelope: TypedEnvelope<proto::SetIndexText>,
1706 mut cx: AsyncApp,
1707 ) -> Result<proto::Ack> {
1708 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1709 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1710 let repo_path = RepoPath::from_str(&envelope.payload.path);
1711
1712 repository_handle
1713 .update(&mut cx, |repository_handle, cx| {
1714 repository_handle.spawn_set_index_text_job(
1715 repo_path,
1716 envelope.payload.text,
1717 None,
1718 cx,
1719 )
1720 })?
1721 .await??;
1722 Ok(proto::Ack {})
1723 }
1724
1725 async fn handle_commit(
1726 this: Entity<Self>,
1727 envelope: TypedEnvelope<proto::Commit>,
1728 mut cx: AsyncApp,
1729 ) -> Result<proto::Ack> {
1730 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1731 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1732
1733 let message = SharedString::from(envelope.payload.message);
1734 let name = envelope.payload.name.map(SharedString::from);
1735 let email = envelope.payload.email.map(SharedString::from);
1736 let options = envelope.payload.options.unwrap_or_default();
1737
1738 repository_handle
1739 .update(&mut cx, |repository_handle, cx| {
1740 repository_handle.commit(
1741 message,
1742 name.zip(email),
1743 CommitOptions {
1744 amend: options.amend,
1745 },
1746 cx,
1747 )
1748 })?
1749 .await??;
1750 Ok(proto::Ack {})
1751 }
1752
1753 async fn handle_get_remotes(
1754 this: Entity<Self>,
1755 envelope: TypedEnvelope<proto::GetRemotes>,
1756 mut cx: AsyncApp,
1757 ) -> Result<proto::GetRemotesResponse> {
1758 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1759 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1760
1761 let branch_name = envelope.payload.branch_name;
1762
1763 let remotes = repository_handle
1764 .update(&mut cx, |repository_handle, _| {
1765 repository_handle.get_remotes(branch_name)
1766 })?
1767 .await??;
1768
1769 Ok(proto::GetRemotesResponse {
1770 remotes: remotes
1771 .into_iter()
1772 .map(|remotes| proto::get_remotes_response::Remote {
1773 name: remotes.name.to_string(),
1774 })
1775 .collect::<Vec<_>>(),
1776 })
1777 }
1778
1779 async fn handle_get_branches(
1780 this: Entity<Self>,
1781 envelope: TypedEnvelope<proto::GitGetBranches>,
1782 mut cx: AsyncApp,
1783 ) -> Result<proto::GitBranchesResponse> {
1784 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1785 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1786
1787 let branches = repository_handle
1788 .update(&mut cx, |repository_handle, _| repository_handle.branches())?
1789 .await??;
1790
1791 Ok(proto::GitBranchesResponse {
1792 branches: branches
1793 .into_iter()
1794 .map(|branch| branch_to_proto(&branch))
1795 .collect::<Vec<_>>(),
1796 })
1797 }
1798 async fn handle_create_branch(
1799 this: Entity<Self>,
1800 envelope: TypedEnvelope<proto::GitCreateBranch>,
1801 mut cx: AsyncApp,
1802 ) -> Result<proto::Ack> {
1803 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1804 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1805 let branch_name = envelope.payload.branch_name;
1806
1807 repository_handle
1808 .update(&mut cx, |repository_handle, _| {
1809 repository_handle.create_branch(branch_name)
1810 })?
1811 .await??;
1812
1813 Ok(proto::Ack {})
1814 }
1815
1816 async fn handle_change_branch(
1817 this: Entity<Self>,
1818 envelope: TypedEnvelope<proto::GitChangeBranch>,
1819 mut cx: AsyncApp,
1820 ) -> Result<proto::Ack> {
1821 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1822 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1823 let branch_name = envelope.payload.branch_name;
1824
1825 repository_handle
1826 .update(&mut cx, |repository_handle, _| {
1827 repository_handle.change_branch(branch_name)
1828 })?
1829 .await??;
1830
1831 Ok(proto::Ack {})
1832 }
1833
1834 async fn handle_show(
1835 this: Entity<Self>,
1836 envelope: TypedEnvelope<proto::GitShow>,
1837 mut cx: AsyncApp,
1838 ) -> Result<proto::GitCommitDetails> {
1839 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1840 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1841
1842 let commit = repository_handle
1843 .update(&mut cx, |repository_handle, _| {
1844 repository_handle.show(envelope.payload.commit)
1845 })?
1846 .await??;
1847 Ok(proto::GitCommitDetails {
1848 sha: commit.sha.into(),
1849 message: commit.message.into(),
1850 commit_timestamp: commit.commit_timestamp,
1851 author_email: commit.author_email.into(),
1852 author_name: commit.author_name.into(),
1853 })
1854 }
1855
1856 async fn handle_load_commit_diff(
1857 this: Entity<Self>,
1858 envelope: TypedEnvelope<proto::LoadCommitDiff>,
1859 mut cx: AsyncApp,
1860 ) -> Result<proto::LoadCommitDiffResponse> {
1861 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1862 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1863
1864 let commit_diff = repository_handle
1865 .update(&mut cx, |repository_handle, _| {
1866 repository_handle.load_commit_diff(envelope.payload.commit)
1867 })?
1868 .await??;
1869 Ok(proto::LoadCommitDiffResponse {
1870 files: commit_diff
1871 .files
1872 .into_iter()
1873 .map(|file| proto::CommitFile {
1874 path: file.path.to_string(),
1875 old_text: file.old_text,
1876 new_text: file.new_text,
1877 })
1878 .collect(),
1879 })
1880 }
1881
1882 async fn handle_reset(
1883 this: Entity<Self>,
1884 envelope: TypedEnvelope<proto::GitReset>,
1885 mut cx: AsyncApp,
1886 ) -> Result<proto::Ack> {
1887 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1888 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1889
1890 let mode = match envelope.payload.mode() {
1891 git_reset::ResetMode::Soft => ResetMode::Soft,
1892 git_reset::ResetMode::Mixed => ResetMode::Mixed,
1893 };
1894
1895 repository_handle
1896 .update(&mut cx, |repository_handle, cx| {
1897 repository_handle.reset(envelope.payload.commit, mode, cx)
1898 })?
1899 .await??;
1900 Ok(proto::Ack {})
1901 }
1902
1903 async fn handle_checkout_files(
1904 this: Entity<Self>,
1905 envelope: TypedEnvelope<proto::GitCheckoutFiles>,
1906 mut cx: AsyncApp,
1907 ) -> Result<proto::Ack> {
1908 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1909 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1910 let paths = envelope
1911 .payload
1912 .paths
1913 .iter()
1914 .map(|s| RepoPath::from_str(s))
1915 .collect();
1916
1917 repository_handle
1918 .update(&mut cx, |repository_handle, cx| {
1919 repository_handle.checkout_files(&envelope.payload.commit, paths, cx)
1920 })?
1921 .await??;
1922 Ok(proto::Ack {})
1923 }
1924
1925 async fn handle_open_commit_message_buffer(
1926 this: Entity<Self>,
1927 envelope: TypedEnvelope<proto::OpenCommitMessageBuffer>,
1928 mut cx: AsyncApp,
1929 ) -> Result<proto::OpenBufferResponse> {
1930 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1931 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1932 let buffer = repository
1933 .update(&mut cx, |repository, cx| {
1934 repository.open_commit_buffer(None, this.read(cx).buffer_store.clone(), cx)
1935 })?
1936 .await?;
1937
1938 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id())?;
1939 this.update(&mut cx, |this, cx| {
1940 this.buffer_store.update(cx, |buffer_store, cx| {
1941 buffer_store
1942 .create_buffer_for_peer(
1943 &buffer,
1944 envelope.original_sender_id.unwrap_or(envelope.sender_id),
1945 cx,
1946 )
1947 .detach_and_log_err(cx);
1948 })
1949 })?;
1950
1951 Ok(proto::OpenBufferResponse {
1952 buffer_id: buffer_id.to_proto(),
1953 })
1954 }
1955
1956 async fn handle_askpass(
1957 this: Entity<Self>,
1958 envelope: TypedEnvelope<proto::AskPassRequest>,
1959 mut cx: AsyncApp,
1960 ) -> Result<proto::AskPassResponse> {
1961 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1962 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1963
1964 let delegates = cx.update(|cx| repository.read(cx).askpass_delegates.clone())?;
1965 let Some(mut askpass) = delegates.lock().remove(&envelope.payload.askpass_id) else {
1966 debug_panic!("no askpass found");
1967 anyhow::bail!("no askpass found");
1968 };
1969
1970 let response = askpass.ask_password(envelope.payload.prompt).await?;
1971
1972 delegates
1973 .lock()
1974 .insert(envelope.payload.askpass_id, askpass);
1975
1976 Ok(proto::AskPassResponse { response })
1977 }
1978
1979 async fn handle_check_for_pushed_commits(
1980 this: Entity<Self>,
1981 envelope: TypedEnvelope<proto::CheckForPushedCommits>,
1982 mut cx: AsyncApp,
1983 ) -> Result<proto::CheckForPushedCommitsResponse> {
1984 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1985 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1986
1987 let branches = repository_handle
1988 .update(&mut cx, |repository_handle, _| {
1989 repository_handle.check_for_pushed_commits()
1990 })?
1991 .await??;
1992 Ok(proto::CheckForPushedCommitsResponse {
1993 pushed_to: branches
1994 .into_iter()
1995 .map(|commit| commit.to_string())
1996 .collect(),
1997 })
1998 }
1999
2000 async fn handle_git_diff(
2001 this: Entity<Self>,
2002 envelope: TypedEnvelope<proto::GitDiff>,
2003 mut cx: AsyncApp,
2004 ) -> Result<proto::GitDiffResponse> {
2005 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2006 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2007 let diff_type = match envelope.payload.diff_type() {
2008 proto::git_diff::DiffType::HeadToIndex => DiffType::HeadToIndex,
2009 proto::git_diff::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
2010 };
2011
2012 let mut diff = repository_handle
2013 .update(&mut cx, |repository_handle, cx| {
2014 repository_handle.diff(diff_type, cx)
2015 })?
2016 .await??;
2017 const ONE_MB: usize = 1_000_000;
2018 if diff.len() > ONE_MB {
2019 diff = diff.chars().take(ONE_MB).collect()
2020 }
2021
2022 Ok(proto::GitDiffResponse { diff })
2023 }
2024
2025 async fn handle_open_unstaged_diff(
2026 this: Entity<Self>,
2027 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2028 mut cx: AsyncApp,
2029 ) -> Result<proto::OpenUnstagedDiffResponse> {
2030 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2031 let diff = this
2032 .update(&mut cx, |this, cx| {
2033 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2034 Some(this.open_unstaged_diff(buffer, cx))
2035 })?
2036 .context("missing buffer")?
2037 .await?;
2038 this.update(&mut cx, |this, _| {
2039 let shared_diffs = this
2040 .shared_diffs
2041 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2042 .or_default();
2043 shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone());
2044 })?;
2045 let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?;
2046 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2047 }
2048
2049 async fn handle_open_uncommitted_diff(
2050 this: Entity<Self>,
2051 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2052 mut cx: AsyncApp,
2053 ) -> Result<proto::OpenUncommittedDiffResponse> {
2054 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2055 let diff = this
2056 .update(&mut cx, |this, cx| {
2057 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2058 Some(this.open_uncommitted_diff(buffer, cx))
2059 })?
2060 .context("missing buffer")?
2061 .await?;
2062 this.update(&mut cx, |this, _| {
2063 let shared_diffs = this
2064 .shared_diffs
2065 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2066 .or_default();
2067 shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone());
2068 })?;
2069 Ok(diff.read_with(&cx, |diff, cx| {
2070 use proto::open_uncommitted_diff_response::Mode;
2071
2072 let unstaged_diff = diff.secondary_diff();
2073 let index_snapshot = unstaged_diff.and_then(|diff| {
2074 let diff = diff.read(cx);
2075 diff.base_text_exists().then(|| diff.base_text())
2076 });
2077
2078 let mode;
2079 let staged_text;
2080 let committed_text;
2081 if diff.base_text_exists() {
2082 let committed_snapshot = diff.base_text();
2083 committed_text = Some(committed_snapshot.text().to_string());
2084 if let Some(index_text) = index_snapshot {
2085 if index_text.remote_id() == committed_snapshot.remote_id() {
2086 mode = Mode::IndexMatchesHead;
2087 staged_text = None;
2088 } else {
2089 mode = Mode::IndexAndHead;
2090 staged_text = Some(index_text.text().to_string());
2091 }
2092 } else {
2093 mode = Mode::IndexAndHead;
2094 staged_text = None;
2095 }
2096 } else {
2097 mode = Mode::IndexAndHead;
2098 committed_text = None;
2099 staged_text = index_snapshot
2100 .as_ref()
2101 .map(|buffer| buffer.text().to_string());
2102 }
2103
2104 proto::OpenUncommittedDiffResponse {
2105 committed_text,
2106 staged_text,
2107 mode: mode as i32,
2108 }
2109 })?)
2110 }
2111
2112 async fn handle_update_diff_bases(
2113 this: Entity<Self>,
2114 request: TypedEnvelope<proto::UpdateDiffBases>,
2115 mut cx: AsyncApp,
2116 ) -> Result<()> {
2117 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2118 this.update(&mut cx, |this, cx| {
2119 if let Some(diff_state) = this.diffs.get_mut(&buffer_id) {
2120 if let Some(buffer) = this.buffer_store.read(cx).get(buffer_id) {
2121 let buffer = buffer.read(cx).text_snapshot();
2122 diff_state.update(cx, |diff_state, cx| {
2123 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2124 })
2125 }
2126 }
2127 })
2128 }
2129
2130 async fn handle_blame_buffer(
2131 this: Entity<Self>,
2132 envelope: TypedEnvelope<proto::BlameBuffer>,
2133 mut cx: AsyncApp,
2134 ) -> Result<proto::BlameBufferResponse> {
2135 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2136 let version = deserialize_version(&envelope.payload.version);
2137 let buffer = this.read_with(&cx, |this, cx| {
2138 this.buffer_store.read(cx).get_existing(buffer_id)
2139 })??;
2140 buffer
2141 .update(&mut cx, |buffer, _| {
2142 buffer.wait_for_version(version.clone())
2143 })?
2144 .await?;
2145 let blame = this
2146 .update(&mut cx, |this, cx| {
2147 this.blame_buffer(&buffer, Some(version), cx)
2148 })?
2149 .await?;
2150 Ok(serialize_blame_buffer_response(blame))
2151 }
2152
2153 async fn handle_get_permalink_to_line(
2154 this: Entity<Self>,
2155 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2156 mut cx: AsyncApp,
2157 ) -> Result<proto::GetPermalinkToLineResponse> {
2158 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2159 // let version = deserialize_version(&envelope.payload.version);
2160 let selection = {
2161 let proto_selection = envelope
2162 .payload
2163 .selection
2164 .context("no selection to get permalink for defined")?;
2165 proto_selection.start as u32..proto_selection.end as u32
2166 };
2167 let buffer = this.read_with(&cx, |this, cx| {
2168 this.buffer_store.read(cx).get_existing(buffer_id)
2169 })??;
2170 let permalink = this
2171 .update(&mut cx, |this, cx| {
2172 this.get_permalink_to_line(&buffer, selection, cx)
2173 })?
2174 .await?;
2175 Ok(proto::GetPermalinkToLineResponse {
2176 permalink: permalink.to_string(),
2177 })
2178 }
2179
2180 fn repository_for_request(
2181 this: &Entity<Self>,
2182 id: RepositoryId,
2183 cx: &mut AsyncApp,
2184 ) -> Result<Entity<Repository>> {
2185 this.read_with(cx, |this, _| {
2186 this.repositories
2187 .get(&id)
2188 .context("missing repository handle")
2189 .cloned()
2190 })?
2191 }
2192
2193 pub fn repo_snapshots(&self, cx: &App) -> HashMap<RepositoryId, RepositorySnapshot> {
2194 self.repositories
2195 .iter()
2196 .map(|(id, repo)| (*id, repo.read(cx).snapshot.clone()))
2197 .collect()
2198 }
2199}
2200
2201impl BufferGitState {
2202 fn new(_git_store: WeakEntity<GitStore>) -> Self {
2203 Self {
2204 unstaged_diff: Default::default(),
2205 uncommitted_diff: Default::default(),
2206 recalculate_diff_task: Default::default(),
2207 language: Default::default(),
2208 language_registry: Default::default(),
2209 recalculating_tx: postage::watch::channel_with(false).0,
2210 hunk_staging_operation_count: 0,
2211 hunk_staging_operation_count_as_of_write: 0,
2212 head_text: Default::default(),
2213 index_text: Default::default(),
2214 head_changed: Default::default(),
2215 index_changed: Default::default(),
2216 language_changed: Default::default(),
2217 conflict_updated_futures: Default::default(),
2218 conflict_set: Default::default(),
2219 reparse_conflict_markers_task: Default::default(),
2220 }
2221 }
2222
2223 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
2224 self.language = buffer.read(cx).language().cloned();
2225 self.language_changed = true;
2226 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
2227 }
2228
2229 fn reparse_conflict_markers(
2230 &mut self,
2231 buffer: text::BufferSnapshot,
2232 cx: &mut Context<Self>,
2233 ) -> oneshot::Receiver<()> {
2234 let (tx, rx) = oneshot::channel();
2235
2236 let Some(conflict_set) = self
2237 .conflict_set
2238 .as_ref()
2239 .and_then(|conflict_set| conflict_set.upgrade())
2240 else {
2241 return rx;
2242 };
2243
2244 let old_snapshot = conflict_set.read_with(cx, |conflict_set, _| {
2245 if conflict_set.has_conflict {
2246 Some(conflict_set.snapshot())
2247 } else {
2248 None
2249 }
2250 });
2251
2252 if let Some(old_snapshot) = old_snapshot {
2253 self.conflict_updated_futures.push(tx);
2254 self.reparse_conflict_markers_task = Some(cx.spawn(async move |this, cx| {
2255 let (snapshot, changed_range) = cx
2256 .background_spawn(async move {
2257 let new_snapshot = ConflictSet::parse(&buffer);
2258 let changed_range = old_snapshot.compare(&new_snapshot, &buffer);
2259 (new_snapshot, changed_range)
2260 })
2261 .await;
2262 this.update(cx, |this, cx| {
2263 if let Some(conflict_set) = &this.conflict_set {
2264 conflict_set
2265 .update(cx, |conflict_set, cx| {
2266 conflict_set.set_snapshot(snapshot, changed_range, cx);
2267 })
2268 .ok();
2269 }
2270 let futures = std::mem::take(&mut this.conflict_updated_futures);
2271 for tx in futures {
2272 tx.send(()).ok();
2273 }
2274 })
2275 }))
2276 }
2277
2278 rx
2279 }
2280
2281 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
2282 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
2283 }
2284
2285 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
2286 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
2287 }
2288
2289 fn handle_base_texts_updated(
2290 &mut self,
2291 buffer: text::BufferSnapshot,
2292 message: proto::UpdateDiffBases,
2293 cx: &mut Context<Self>,
2294 ) {
2295 use proto::update_diff_bases::Mode;
2296
2297 let Some(mode) = Mode::from_i32(message.mode) else {
2298 return;
2299 };
2300
2301 let diff_bases_change = match mode {
2302 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
2303 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
2304 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
2305 Mode::IndexAndHead => DiffBasesChange::SetEach {
2306 index: message.staged_text,
2307 head: message.committed_text,
2308 },
2309 };
2310
2311 self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
2312 }
2313
2314 pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
2315 if *self.recalculating_tx.borrow() {
2316 let mut rx = self.recalculating_tx.subscribe();
2317 return Some(async move {
2318 loop {
2319 let is_recalculating = rx.recv().await;
2320 if is_recalculating != Some(true) {
2321 break;
2322 }
2323 }
2324 });
2325 } else {
2326 None
2327 }
2328 }
2329
2330 fn diff_bases_changed(
2331 &mut self,
2332 buffer: text::BufferSnapshot,
2333 diff_bases_change: Option<DiffBasesChange>,
2334 cx: &mut Context<Self>,
2335 ) {
2336 match diff_bases_change {
2337 Some(DiffBasesChange::SetIndex(index)) => {
2338 self.index_text = index.map(|mut index| {
2339 text::LineEnding::normalize(&mut index);
2340 Arc::new(index)
2341 });
2342 self.index_changed = true;
2343 }
2344 Some(DiffBasesChange::SetHead(head)) => {
2345 self.head_text = head.map(|mut head| {
2346 text::LineEnding::normalize(&mut head);
2347 Arc::new(head)
2348 });
2349 self.head_changed = true;
2350 }
2351 Some(DiffBasesChange::SetBoth(text)) => {
2352 let text = text.map(|mut text| {
2353 text::LineEnding::normalize(&mut text);
2354 Arc::new(text)
2355 });
2356 self.head_text = text.clone();
2357 self.index_text = text;
2358 self.head_changed = true;
2359 self.index_changed = true;
2360 }
2361 Some(DiffBasesChange::SetEach { index, head }) => {
2362 self.index_text = index.map(|mut index| {
2363 text::LineEnding::normalize(&mut index);
2364 Arc::new(index)
2365 });
2366 self.index_changed = true;
2367 self.head_text = head.map(|mut head| {
2368 text::LineEnding::normalize(&mut head);
2369 Arc::new(head)
2370 });
2371 self.head_changed = true;
2372 }
2373 None => {}
2374 }
2375
2376 self.recalculate_diffs(buffer, cx)
2377 }
2378
2379 fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
2380 *self.recalculating_tx.borrow_mut() = true;
2381
2382 let language = self.language.clone();
2383 let language_registry = self.language_registry.clone();
2384 let unstaged_diff = self.unstaged_diff();
2385 let uncommitted_diff = self.uncommitted_diff();
2386 let head = self.head_text.clone();
2387 let index = self.index_text.clone();
2388 let index_changed = self.index_changed;
2389 let head_changed = self.head_changed;
2390 let language_changed = self.language_changed;
2391 let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_write;
2392 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
2393 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
2394 (None, None) => true,
2395 _ => false,
2396 };
2397 self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
2398 log::debug!(
2399 "start recalculating diffs for buffer {}",
2400 buffer.remote_id()
2401 );
2402
2403 let mut new_unstaged_diff = None;
2404 if let Some(unstaged_diff) = &unstaged_diff {
2405 new_unstaged_diff = Some(
2406 BufferDiff::update_diff(
2407 unstaged_diff.clone(),
2408 buffer.clone(),
2409 index,
2410 index_changed,
2411 language_changed,
2412 language.clone(),
2413 language_registry.clone(),
2414 cx,
2415 )
2416 .await?,
2417 );
2418 }
2419
2420 let mut new_uncommitted_diff = None;
2421 if let Some(uncommitted_diff) = &uncommitted_diff {
2422 new_uncommitted_diff = if index_matches_head {
2423 new_unstaged_diff.clone()
2424 } else {
2425 Some(
2426 BufferDiff::update_diff(
2427 uncommitted_diff.clone(),
2428 buffer.clone(),
2429 head,
2430 head_changed,
2431 language_changed,
2432 language.clone(),
2433 language_registry.clone(),
2434 cx,
2435 )
2436 .await?,
2437 )
2438 }
2439 }
2440
2441 let cancel = this.update(cx, |this, _| {
2442 // This checks whether all pending stage/unstage operations
2443 // have quiesced (i.e. both the corresponding write and the
2444 // read of that write have completed). If not, then we cancel
2445 // this recalculation attempt to avoid invalidating pending
2446 // state too quickly; another recalculation will come along
2447 // later and clear the pending state once the state of the index has settled.
2448 if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
2449 *this.recalculating_tx.borrow_mut() = false;
2450 true
2451 } else {
2452 false
2453 }
2454 })?;
2455 if cancel {
2456 log::debug!(
2457 concat!(
2458 "aborting recalculating diffs for buffer {}",
2459 "due to subsequent hunk operations",
2460 ),
2461 buffer.remote_id()
2462 );
2463 return Ok(());
2464 }
2465
2466 let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
2467 unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
2468 {
2469 unstaged_diff.update(cx, |diff, cx| {
2470 if language_changed {
2471 diff.language_changed(cx);
2472 }
2473 diff.set_snapshot(new_unstaged_diff, &buffer, cx)
2474 })?
2475 } else {
2476 None
2477 };
2478
2479 if let Some((uncommitted_diff, new_uncommitted_diff)) =
2480 uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
2481 {
2482 uncommitted_diff.update(cx, |diff, cx| {
2483 if language_changed {
2484 diff.language_changed(cx);
2485 }
2486 diff.set_snapshot_with_secondary(
2487 new_uncommitted_diff,
2488 &buffer,
2489 unstaged_changed_range,
2490 true,
2491 cx,
2492 );
2493 })?;
2494 }
2495
2496 log::debug!(
2497 "finished recalculating diffs for buffer {}",
2498 buffer.remote_id()
2499 );
2500
2501 if let Some(this) = this.upgrade() {
2502 this.update(cx, |this, _| {
2503 this.index_changed = false;
2504 this.head_changed = false;
2505 this.language_changed = false;
2506 *this.recalculating_tx.borrow_mut() = false;
2507 })?;
2508 }
2509
2510 Ok(())
2511 }));
2512 }
2513}
2514
2515fn make_remote_delegate(
2516 this: Entity<GitStore>,
2517 project_id: u64,
2518 repository_id: RepositoryId,
2519 askpass_id: u64,
2520 cx: &mut AsyncApp,
2521) -> AskPassDelegate {
2522 AskPassDelegate::new(cx, move |prompt, tx, cx| {
2523 this.update(cx, |this, cx| {
2524 let Some((client, _)) = this.downstream_client() else {
2525 return;
2526 };
2527 let response = client.request(proto::AskPassRequest {
2528 project_id,
2529 repository_id: repository_id.to_proto(),
2530 askpass_id,
2531 prompt,
2532 });
2533 cx.spawn(async move |_, _| {
2534 tx.send(response.await?.response).ok();
2535 anyhow::Ok(())
2536 })
2537 .detach_and_log_err(cx);
2538 })
2539 .log_err();
2540 })
2541}
2542
2543impl RepositoryId {
2544 pub fn to_proto(self) -> u64 {
2545 self.0
2546 }
2547
2548 pub fn from_proto(id: u64) -> Self {
2549 RepositoryId(id)
2550 }
2551}
2552
2553impl RepositorySnapshot {
2554 fn empty(id: RepositoryId, work_directory_abs_path: Arc<Path>) -> Self {
2555 Self {
2556 id,
2557 statuses_by_path: Default::default(),
2558 work_directory_abs_path,
2559 branch: None,
2560 head_commit: None,
2561 scan_id: 0,
2562 merge: Default::default(),
2563 }
2564 }
2565
2566 fn initial_update(&self, project_id: u64) -> proto::UpdateRepository {
2567 proto::UpdateRepository {
2568 branch_summary: self.branch.as_ref().map(branch_to_proto),
2569 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2570 updated_statuses: self
2571 .statuses_by_path
2572 .iter()
2573 .map(|entry| entry.to_proto())
2574 .collect(),
2575 removed_statuses: Default::default(),
2576 current_merge_conflicts: self
2577 .merge
2578 .conflicted_paths
2579 .iter()
2580 .map(|repo_path| repo_path.to_proto())
2581 .collect(),
2582 project_id,
2583 id: self.id.to_proto(),
2584 abs_path: self.work_directory_abs_path.to_proto(),
2585 entry_ids: vec![self.id.to_proto()],
2586 scan_id: self.scan_id,
2587 is_last_update: true,
2588 }
2589 }
2590
2591 fn build_update(&self, old: &Self, project_id: u64) -> proto::UpdateRepository {
2592 let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
2593 let mut removed_statuses: Vec<String> = Vec::new();
2594
2595 let mut new_statuses = self.statuses_by_path.iter().peekable();
2596 let mut old_statuses = old.statuses_by_path.iter().peekable();
2597
2598 let mut current_new_entry = new_statuses.next();
2599 let mut current_old_entry = old_statuses.next();
2600 loop {
2601 match (current_new_entry, current_old_entry) {
2602 (Some(new_entry), Some(old_entry)) => {
2603 match new_entry.repo_path.cmp(&old_entry.repo_path) {
2604 Ordering::Less => {
2605 updated_statuses.push(new_entry.to_proto());
2606 current_new_entry = new_statuses.next();
2607 }
2608 Ordering::Equal => {
2609 if new_entry.status != old_entry.status {
2610 updated_statuses.push(new_entry.to_proto());
2611 }
2612 current_old_entry = old_statuses.next();
2613 current_new_entry = new_statuses.next();
2614 }
2615 Ordering::Greater => {
2616 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2617 current_old_entry = old_statuses.next();
2618 }
2619 }
2620 }
2621 (None, Some(old_entry)) => {
2622 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2623 current_old_entry = old_statuses.next();
2624 }
2625 (Some(new_entry), None) => {
2626 updated_statuses.push(new_entry.to_proto());
2627 current_new_entry = new_statuses.next();
2628 }
2629 (None, None) => break,
2630 }
2631 }
2632
2633 proto::UpdateRepository {
2634 branch_summary: self.branch.as_ref().map(branch_to_proto),
2635 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
2636 updated_statuses,
2637 removed_statuses,
2638 current_merge_conflicts: self
2639 .merge
2640 .conflicted_paths
2641 .iter()
2642 .map(|path| path.as_ref().to_proto())
2643 .collect(),
2644 project_id,
2645 id: self.id.to_proto(),
2646 abs_path: self.work_directory_abs_path.to_proto(),
2647 entry_ids: vec![],
2648 scan_id: self.scan_id,
2649 is_last_update: true,
2650 }
2651 }
2652
2653 pub fn status(&self) -> impl Iterator<Item = StatusEntry> + '_ {
2654 self.statuses_by_path.iter().cloned()
2655 }
2656
2657 pub fn status_summary(&self) -> GitSummary {
2658 self.statuses_by_path.summary().item_summary
2659 }
2660
2661 pub fn status_for_path(&self, path: &RepoPath) -> Option<StatusEntry> {
2662 self.statuses_by_path
2663 .get(&PathKey(path.0.clone()), &())
2664 .cloned()
2665 }
2666
2667 pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
2668 abs_path
2669 .strip_prefix(&self.work_directory_abs_path)
2670 .map(RepoPath::from)
2671 .ok()
2672 }
2673
2674 pub fn had_conflict_on_last_merge_head_change(&self, repo_path: &RepoPath) -> bool {
2675 self.merge.conflicted_paths.contains(&repo_path)
2676 }
2677
2678 pub fn has_conflict(&self, repo_path: &RepoPath) -> bool {
2679 let had_conflict_on_last_merge_head_change =
2680 self.merge.conflicted_paths.contains(&repo_path);
2681 let has_conflict_currently = self
2682 .status_for_path(&repo_path)
2683 .map_or(false, |entry| entry.status.is_conflicted());
2684 had_conflict_on_last_merge_head_change || has_conflict_currently
2685 }
2686
2687 /// This is the name that will be displayed in the repository selector for this repository.
2688 pub fn display_name(&self) -> SharedString {
2689 self.work_directory_abs_path
2690 .file_name()
2691 .unwrap_or_default()
2692 .to_string_lossy()
2693 .to_string()
2694 .into()
2695 }
2696}
2697
2698impl MergeDetails {
2699 async fn load(
2700 backend: &Arc<dyn GitRepository>,
2701 status: &SumTree<StatusEntry>,
2702 prev_snapshot: &RepositorySnapshot,
2703 ) -> Result<(MergeDetails, bool)> {
2704 log::debug!("load merge details");
2705 let message = backend.merge_message().await;
2706 let heads = backend
2707 .revparse_batch(vec![
2708 "MERGE_HEAD".into(),
2709 "CHERRY_PICK_HEAD".into(),
2710 "REBASE_HEAD".into(),
2711 "REVERT_HEAD".into(),
2712 "APPLY_HEAD".into(),
2713 ])
2714 .await
2715 .log_err()
2716 .unwrap_or_default()
2717 .into_iter()
2718 .map(|opt| opt.map(SharedString::from))
2719 .collect::<Vec<_>>();
2720 let merge_heads_changed = heads != prev_snapshot.merge.heads;
2721 let conflicted_paths = if merge_heads_changed {
2722 let current_conflicted_paths = TreeSet::from_ordered_entries(
2723 status
2724 .iter()
2725 .filter(|entry| entry.status.is_conflicted())
2726 .map(|entry| entry.repo_path.clone()),
2727 );
2728
2729 // It can happen that we run a scan while a lengthy merge is in progress
2730 // that will eventually result in conflicts, but before those conflicts
2731 // are reported by `git status`. Since for the moment we only care about
2732 // the merge heads state for the purposes of tracking conflicts, don't update
2733 // this state until we see some conflicts.
2734 if heads.iter().any(Option::is_some)
2735 && !prev_snapshot.merge.heads.iter().any(Option::is_some)
2736 && current_conflicted_paths.is_empty()
2737 {
2738 log::debug!("not updating merge heads because no conflicts found");
2739 return Ok((
2740 MergeDetails {
2741 message: message.map(SharedString::from),
2742 ..prev_snapshot.merge.clone()
2743 },
2744 false,
2745 ));
2746 }
2747
2748 current_conflicted_paths
2749 } else {
2750 prev_snapshot.merge.conflicted_paths.clone()
2751 };
2752 let details = MergeDetails {
2753 conflicted_paths,
2754 message: message.map(SharedString::from),
2755 heads,
2756 };
2757 Ok((details, merge_heads_changed))
2758 }
2759}
2760
2761impl Repository {
2762 pub fn snapshot(&self) -> RepositorySnapshot {
2763 self.snapshot.clone()
2764 }
2765
2766 fn local(
2767 id: RepositoryId,
2768 work_directory_abs_path: Arc<Path>,
2769 dot_git_abs_path: Arc<Path>,
2770 repository_dir_abs_path: Arc<Path>,
2771 common_dir_abs_path: Arc<Path>,
2772 project_environment: WeakEntity<ProjectEnvironment>,
2773 fs: Arc<dyn Fs>,
2774 git_store: WeakEntity<GitStore>,
2775 cx: &mut Context<Self>,
2776 ) -> Self {
2777 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path.clone());
2778 Repository {
2779 this: cx.weak_entity(),
2780 git_store,
2781 snapshot,
2782 commit_message_buffer: None,
2783 askpass_delegates: Default::default(),
2784 paths_needing_status_update: Default::default(),
2785 latest_askpass_id: 0,
2786 job_sender: Repository::spawn_local_git_worker(
2787 work_directory_abs_path,
2788 dot_git_abs_path,
2789 repository_dir_abs_path,
2790 common_dir_abs_path,
2791 project_environment,
2792 fs,
2793 cx,
2794 ),
2795 job_id: 0,
2796 active_jobs: Default::default(),
2797 }
2798 }
2799
2800 fn remote(
2801 id: RepositoryId,
2802 work_directory_abs_path: Arc<Path>,
2803 project_id: ProjectId,
2804 client: AnyProtoClient,
2805 git_store: WeakEntity<GitStore>,
2806 cx: &mut Context<Self>,
2807 ) -> Self {
2808 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path);
2809 Self {
2810 this: cx.weak_entity(),
2811 snapshot,
2812 commit_message_buffer: None,
2813 git_store,
2814 paths_needing_status_update: Default::default(),
2815 job_sender: Self::spawn_remote_git_worker(project_id, client, cx),
2816 askpass_delegates: Default::default(),
2817 latest_askpass_id: 0,
2818 active_jobs: Default::default(),
2819 job_id: 0,
2820 }
2821 }
2822
2823 pub fn git_store(&self) -> Option<Entity<GitStore>> {
2824 self.git_store.upgrade()
2825 }
2826
2827 fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
2828 let this = cx.weak_entity();
2829 let git_store = self.git_store.clone();
2830 let _ = self.send_keyed_job(
2831 Some(GitJobKey::ReloadBufferDiffBases),
2832 None,
2833 |state, mut cx| async move {
2834 let RepositoryState::Local { backend, .. } = state else {
2835 log::error!("tried to recompute diffs for a non-local repository");
2836 return Ok(());
2837 };
2838
2839 let Some(this) = this.upgrade() else {
2840 return Ok(());
2841 };
2842
2843 let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
2844 git_store.update(cx, |git_store, cx| {
2845 git_store
2846 .diffs
2847 .iter()
2848 .filter_map(|(buffer_id, diff_state)| {
2849 let buffer_store = git_store.buffer_store.read(cx);
2850 let buffer = buffer_store.get(*buffer_id)?;
2851 let (worktree, path) = {
2852 let buffer_ref = buffer.read(cx);
2853 let file = File::from_dyn(buffer_ref.file())?;
2854 (file.worktree.clone(), file.path.clone())
2855 };
2856 let abs_path = {
2857 let worktree_ref = worktree.read(cx);
2858 worktree_ref.absolutize(&path).ok()?
2859 };
2860 let repo_path = this.abs_path_to_repo_path(&abs_path)?;
2861 log::debug!(
2862 "start reload diff bases for repo path {}",
2863 repo_path.0.display()
2864 );
2865 diff_state.update(cx, |diff_state, _| {
2866 let has_unstaged_diff = diff_state
2867 .unstaged_diff
2868 .as_ref()
2869 .is_some_and(|diff| diff.is_upgradable());
2870 let has_uncommitted_diff = diff_state
2871 .uncommitted_diff
2872 .as_ref()
2873 .is_some_and(|set| set.is_upgradable());
2874
2875 Some((
2876 buffer,
2877 repo_path,
2878 has_unstaged_diff.then(|| diff_state.index_text.clone()),
2879 has_uncommitted_diff.then(|| diff_state.head_text.clone()),
2880 ))
2881 })
2882 })
2883 .collect::<Vec<_>>()
2884 })
2885 })??;
2886
2887 let buffer_diff_base_changes = cx
2888 .background_spawn(async move {
2889 let mut changes = Vec::new();
2890 for (buffer, repo_path, current_index_text, current_head_text) in
2891 &repo_diff_state_updates
2892 {
2893 let index_text = if current_index_text.is_some() {
2894 backend.load_index_text(repo_path.clone()).await
2895 } else {
2896 None
2897 };
2898 let head_text = if current_head_text.is_some() {
2899 backend.load_committed_text(repo_path.clone()).await
2900 } else {
2901 None
2902 };
2903
2904 let change =
2905 match (current_index_text.as_ref(), current_head_text.as_ref()) {
2906 (Some(current_index), Some(current_head)) => {
2907 let index_changed =
2908 index_text.as_ref() != current_index.as_deref();
2909 let head_changed =
2910 head_text.as_ref() != current_head.as_deref();
2911 if index_changed && head_changed {
2912 if index_text == head_text {
2913 Some(DiffBasesChange::SetBoth(head_text))
2914 } else {
2915 Some(DiffBasesChange::SetEach {
2916 index: index_text,
2917 head: head_text,
2918 })
2919 }
2920 } else if index_changed {
2921 Some(DiffBasesChange::SetIndex(index_text))
2922 } else if head_changed {
2923 Some(DiffBasesChange::SetHead(head_text))
2924 } else {
2925 None
2926 }
2927 }
2928 (Some(current_index), None) => {
2929 let index_changed =
2930 index_text.as_ref() != current_index.as_deref();
2931 index_changed
2932 .then_some(DiffBasesChange::SetIndex(index_text))
2933 }
2934 (None, Some(current_head)) => {
2935 let head_changed =
2936 head_text.as_ref() != current_head.as_deref();
2937 head_changed.then_some(DiffBasesChange::SetHead(head_text))
2938 }
2939 (None, None) => None,
2940 };
2941
2942 changes.push((buffer.clone(), change))
2943 }
2944 changes
2945 })
2946 .await;
2947
2948 git_store.update(&mut cx, |git_store, cx| {
2949 for (buffer, diff_bases_change) in buffer_diff_base_changes {
2950 let buffer_snapshot = buffer.read(cx).text_snapshot();
2951 let buffer_id = buffer_snapshot.remote_id();
2952 let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
2953 continue;
2954 };
2955
2956 let downstream_client = git_store.downstream_client();
2957 diff_state.update(cx, |diff_state, cx| {
2958 use proto::update_diff_bases::Mode;
2959
2960 if let Some((diff_bases_change, (client, project_id))) =
2961 diff_bases_change.clone().zip(downstream_client)
2962 {
2963 let (staged_text, committed_text, mode) = match diff_bases_change {
2964 DiffBasesChange::SetIndex(index) => {
2965 (index, None, Mode::IndexOnly)
2966 }
2967 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
2968 DiffBasesChange::SetEach { index, head } => {
2969 (index, head, Mode::IndexAndHead)
2970 }
2971 DiffBasesChange::SetBoth(text) => {
2972 (None, text, Mode::IndexMatchesHead)
2973 }
2974 };
2975 client
2976 .send(proto::UpdateDiffBases {
2977 project_id: project_id.to_proto(),
2978 buffer_id: buffer_id.to_proto(),
2979 staged_text,
2980 committed_text,
2981 mode: mode as i32,
2982 })
2983 .log_err();
2984 }
2985
2986 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
2987 });
2988 }
2989 })
2990 },
2991 );
2992 }
2993
2994 pub fn send_job<F, Fut, R>(
2995 &mut self,
2996 status: Option<SharedString>,
2997 job: F,
2998 ) -> oneshot::Receiver<R>
2999 where
3000 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3001 Fut: Future<Output = R> + 'static,
3002 R: Send + 'static,
3003 {
3004 self.send_keyed_job(None, status, job)
3005 }
3006
3007 fn send_keyed_job<F, Fut, R>(
3008 &mut self,
3009 key: Option<GitJobKey>,
3010 status: Option<SharedString>,
3011 job: F,
3012 ) -> oneshot::Receiver<R>
3013 where
3014 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3015 Fut: Future<Output = R> + 'static,
3016 R: Send + 'static,
3017 {
3018 let (result_tx, result_rx) = futures::channel::oneshot::channel();
3019 let job_id = post_inc(&mut self.job_id);
3020 let this = self.this.clone();
3021 self.job_sender
3022 .unbounded_send(GitJob {
3023 key,
3024 job: Box::new(move |state, cx: &mut AsyncApp| {
3025 let job = job(state, cx.clone());
3026 cx.spawn(async move |cx| {
3027 if let Some(s) = status.clone() {
3028 this.update(cx, |this, cx| {
3029 this.active_jobs.insert(
3030 job_id,
3031 JobInfo {
3032 start: Instant::now(),
3033 message: s.clone(),
3034 },
3035 );
3036
3037 cx.notify();
3038 })
3039 .ok();
3040 }
3041 let result = job.await;
3042
3043 this.update(cx, |this, cx| {
3044 this.active_jobs.remove(&job_id);
3045 cx.notify();
3046 })
3047 .ok();
3048
3049 result_tx.send(result).ok();
3050 })
3051 }),
3052 })
3053 .ok();
3054 result_rx
3055 }
3056
3057 pub fn set_as_active_repository(&self, cx: &mut Context<Self>) {
3058 let Some(git_store) = self.git_store.upgrade() else {
3059 return;
3060 };
3061 let entity = cx.entity();
3062 git_store.update(cx, |git_store, cx| {
3063 let Some((&id, _)) = git_store
3064 .repositories
3065 .iter()
3066 .find(|(_, handle)| *handle == &entity)
3067 else {
3068 return;
3069 };
3070 git_store.active_repo_id = Some(id);
3071 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
3072 });
3073 }
3074
3075 pub fn cached_status(&self) -> impl '_ + Iterator<Item = StatusEntry> {
3076 self.snapshot.status()
3077 }
3078
3079 pub fn repo_path_to_project_path(&self, path: &RepoPath, cx: &App) -> Option<ProjectPath> {
3080 let git_store = self.git_store.upgrade()?;
3081 let git_store_ref = git_store.read(cx);
3082 let worktree_store = git_store_ref.worktree_store.read(cx);
3083 let abs_path = self.snapshot.work_directory_abs_path.join(&path.0);
3084 let (worktree, relative_path) = worktree_store.find_worktree(abs_path, cx)?;
3085 let worktree_id = worktree.read(cx).id();
3086 Some(ProjectPath {
3087 worktree_id,
3088 path: relative_path.into(),
3089 })
3090 }
3091
3092 pub fn project_path_to_repo_path(&self, path: &ProjectPath, cx: &App) -> Option<RepoPath> {
3093 let git_store = self.git_store.upgrade()?;
3094 let git_store_ref = git_store.read(cx);
3095 let worktree_store = git_store_ref.worktree_store.read(cx);
3096 let abs_path = worktree_store.absolutize(path, cx)?;
3097 self.snapshot.abs_path_to_repo_path(&abs_path)
3098 }
3099
3100 pub fn contains_sub_repo(&self, other: &Entity<Self>, cx: &App) -> bool {
3101 other
3102 .read(cx)
3103 .snapshot
3104 .work_directory_abs_path
3105 .starts_with(&self.snapshot.work_directory_abs_path)
3106 }
3107
3108 pub fn open_commit_buffer(
3109 &mut self,
3110 languages: Option<Arc<LanguageRegistry>>,
3111 buffer_store: Entity<BufferStore>,
3112 cx: &mut Context<Self>,
3113 ) -> Task<Result<Entity<Buffer>>> {
3114 let id = self.id;
3115 if let Some(buffer) = self.commit_message_buffer.clone() {
3116 return Task::ready(Ok(buffer));
3117 }
3118 let this = cx.weak_entity();
3119
3120 let rx = self.send_job(None, move |state, mut cx| async move {
3121 let Some(this) = this.upgrade() else {
3122 bail!("git store was dropped");
3123 };
3124 match state {
3125 RepositoryState::Local { .. } => {
3126 this.update(&mut cx, |_, cx| {
3127 Self::open_local_commit_buffer(languages, buffer_store, cx)
3128 })?
3129 .await
3130 }
3131 RepositoryState::Remote { project_id, client } => {
3132 let request = client.request(proto::OpenCommitMessageBuffer {
3133 project_id: project_id.0,
3134 repository_id: id.to_proto(),
3135 });
3136 let response = request.await.context("requesting to open commit buffer")?;
3137 let buffer_id = BufferId::new(response.buffer_id)?;
3138 let buffer = buffer_store
3139 .update(&mut cx, |buffer_store, cx| {
3140 buffer_store.wait_for_remote_buffer(buffer_id, cx)
3141 })?
3142 .await?;
3143 if let Some(language_registry) = languages {
3144 let git_commit_language =
3145 language_registry.language_for_name("Git Commit").await?;
3146 buffer.update(&mut cx, |buffer, cx| {
3147 buffer.set_language(Some(git_commit_language), cx);
3148 })?;
3149 }
3150 this.update(&mut cx, |this, _| {
3151 this.commit_message_buffer = Some(buffer.clone());
3152 })?;
3153 Ok(buffer)
3154 }
3155 }
3156 });
3157
3158 cx.spawn(|_, _: &mut AsyncApp| async move { rx.await? })
3159 }
3160
3161 fn open_local_commit_buffer(
3162 language_registry: Option<Arc<LanguageRegistry>>,
3163 buffer_store: Entity<BufferStore>,
3164 cx: &mut Context<Self>,
3165 ) -> Task<Result<Entity<Buffer>>> {
3166 cx.spawn(async move |repository, cx| {
3167 let buffer = buffer_store
3168 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx))?
3169 .await?;
3170
3171 if let Some(language_registry) = language_registry {
3172 let git_commit_language = language_registry.language_for_name("Git Commit").await?;
3173 buffer.update(cx, |buffer, cx| {
3174 buffer.set_language(Some(git_commit_language), cx);
3175 })?;
3176 }
3177
3178 repository.update(cx, |repository, _| {
3179 repository.commit_message_buffer = Some(buffer.clone());
3180 })?;
3181 Ok(buffer)
3182 })
3183 }
3184
3185 pub fn checkout_files(
3186 &mut self,
3187 commit: &str,
3188 paths: Vec<RepoPath>,
3189 _cx: &mut App,
3190 ) -> oneshot::Receiver<Result<()>> {
3191 let commit = commit.to_string();
3192 let id = self.id;
3193
3194 self.send_job(
3195 Some(format!("git checkout {}", commit).into()),
3196 move |git_repo, _| async move {
3197 match git_repo {
3198 RepositoryState::Local {
3199 backend,
3200 environment,
3201 ..
3202 } => {
3203 backend
3204 .checkout_files(commit, paths, environment.clone())
3205 .await
3206 }
3207 RepositoryState::Remote { project_id, client } => {
3208 client
3209 .request(proto::GitCheckoutFiles {
3210 project_id: project_id.0,
3211 repository_id: id.to_proto(),
3212 commit,
3213 paths: paths
3214 .into_iter()
3215 .map(|p| p.to_string_lossy().to_string())
3216 .collect(),
3217 })
3218 .await?;
3219
3220 Ok(())
3221 }
3222 }
3223 },
3224 )
3225 }
3226
3227 pub fn reset(
3228 &mut self,
3229 commit: String,
3230 reset_mode: ResetMode,
3231 _cx: &mut App,
3232 ) -> oneshot::Receiver<Result<()>> {
3233 let commit = commit.to_string();
3234 let id = self.id;
3235
3236 self.send_job(None, move |git_repo, _| async move {
3237 match git_repo {
3238 RepositoryState::Local {
3239 backend,
3240 environment,
3241 ..
3242 } => backend.reset(commit, reset_mode, environment).await,
3243 RepositoryState::Remote { project_id, client } => {
3244 client
3245 .request(proto::GitReset {
3246 project_id: project_id.0,
3247 repository_id: id.to_proto(),
3248 commit,
3249 mode: match reset_mode {
3250 ResetMode::Soft => git_reset::ResetMode::Soft.into(),
3251 ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
3252 },
3253 })
3254 .await?;
3255
3256 Ok(())
3257 }
3258 }
3259 })
3260 }
3261
3262 pub fn show(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDetails>> {
3263 let id = self.id;
3264 self.send_job(None, move |git_repo, _cx| async move {
3265 match git_repo {
3266 RepositoryState::Local { backend, .. } => backend.show(commit).await,
3267 RepositoryState::Remote { project_id, client } => {
3268 let resp = client
3269 .request(proto::GitShow {
3270 project_id: project_id.0,
3271 repository_id: id.to_proto(),
3272 commit,
3273 })
3274 .await?;
3275
3276 Ok(CommitDetails {
3277 sha: resp.sha.into(),
3278 message: resp.message.into(),
3279 commit_timestamp: resp.commit_timestamp,
3280 author_email: resp.author_email.into(),
3281 author_name: resp.author_name.into(),
3282 })
3283 }
3284 }
3285 })
3286 }
3287
3288 pub fn load_commit_diff(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
3289 let id = self.id;
3290 self.send_job(None, move |git_repo, cx| async move {
3291 match git_repo {
3292 RepositoryState::Local { backend, .. } => backend.load_commit(commit, cx).await,
3293 RepositoryState::Remote {
3294 client, project_id, ..
3295 } => {
3296 let response = client
3297 .request(proto::LoadCommitDiff {
3298 project_id: project_id.0,
3299 repository_id: id.to_proto(),
3300 commit,
3301 })
3302 .await?;
3303 Ok(CommitDiff {
3304 files: response
3305 .files
3306 .into_iter()
3307 .map(|file| CommitFile {
3308 path: Path::new(&file.path).into(),
3309 old_text: file.old_text,
3310 new_text: file.new_text,
3311 })
3312 .collect(),
3313 })
3314 }
3315 }
3316 })
3317 }
3318
3319 fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
3320 Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
3321 }
3322
3323 pub fn stage_entries(
3324 &self,
3325 entries: Vec<RepoPath>,
3326 cx: &mut Context<Self>,
3327 ) -> Task<anyhow::Result<()>> {
3328 if entries.is_empty() {
3329 return Task::ready(Ok(()));
3330 }
3331 let id = self.id;
3332
3333 let mut save_futures = Vec::new();
3334 if let Some(buffer_store) = self.buffer_store(cx) {
3335 buffer_store.update(cx, |buffer_store, cx| {
3336 for path in &entries {
3337 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3338 continue;
3339 };
3340 if let Some(buffer) = buffer_store.get_by_path(&project_path) {
3341 if buffer
3342 .read(cx)
3343 .file()
3344 .map_or(false, |file| file.disk_state().exists())
3345 {
3346 save_futures.push(buffer_store.save_buffer(buffer, cx));
3347 }
3348 }
3349 }
3350 })
3351 }
3352
3353 cx.spawn(async move |this, cx| {
3354 for save_future in save_futures {
3355 save_future.await?;
3356 }
3357
3358 this.update(cx, |this, _| {
3359 this.send_job(None, move |git_repo, _cx| async move {
3360 match git_repo {
3361 RepositoryState::Local {
3362 backend,
3363 environment,
3364 ..
3365 } => backend.stage_paths(entries, environment.clone()).await,
3366 RepositoryState::Remote { project_id, client } => {
3367 client
3368 .request(proto::Stage {
3369 project_id: project_id.0,
3370 repository_id: id.to_proto(),
3371 paths: entries
3372 .into_iter()
3373 .map(|repo_path| repo_path.as_ref().to_proto())
3374 .collect(),
3375 })
3376 .await
3377 .context("sending stage request")?;
3378
3379 Ok(())
3380 }
3381 }
3382 })
3383 })?
3384 .await??;
3385
3386 Ok(())
3387 })
3388 }
3389
3390 pub fn unstage_entries(
3391 &self,
3392 entries: Vec<RepoPath>,
3393 cx: &mut Context<Self>,
3394 ) -> Task<anyhow::Result<()>> {
3395 if entries.is_empty() {
3396 return Task::ready(Ok(()));
3397 }
3398 let id = self.id;
3399
3400 let mut save_futures = Vec::new();
3401 if let Some(buffer_store) = self.buffer_store(cx) {
3402 buffer_store.update(cx, |buffer_store, cx| {
3403 for path in &entries {
3404 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3405 continue;
3406 };
3407 if let Some(buffer) = buffer_store.get_by_path(&project_path) {
3408 if buffer
3409 .read(cx)
3410 .file()
3411 .map_or(false, |file| file.disk_state().exists())
3412 {
3413 save_futures.push(buffer_store.save_buffer(buffer, cx));
3414 }
3415 }
3416 }
3417 })
3418 }
3419
3420 cx.spawn(async move |this, cx| {
3421 for save_future in save_futures {
3422 save_future.await?;
3423 }
3424
3425 this.update(cx, |this, _| {
3426 this.send_job(None, move |git_repo, _cx| async move {
3427 match git_repo {
3428 RepositoryState::Local {
3429 backend,
3430 environment,
3431 ..
3432 } => backend.unstage_paths(entries, environment).await,
3433 RepositoryState::Remote { project_id, client } => {
3434 client
3435 .request(proto::Unstage {
3436 project_id: project_id.0,
3437 repository_id: id.to_proto(),
3438 paths: entries
3439 .into_iter()
3440 .map(|repo_path| repo_path.as_ref().to_proto())
3441 .collect(),
3442 })
3443 .await
3444 .context("sending unstage request")?;
3445
3446 Ok(())
3447 }
3448 }
3449 })
3450 })?
3451 .await??;
3452
3453 Ok(())
3454 })
3455 }
3456
3457 pub fn stage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3458 let to_stage = self
3459 .cached_status()
3460 .filter(|entry| !entry.status.staging().is_fully_staged())
3461 .map(|entry| entry.repo_path.clone())
3462 .collect();
3463 self.stage_entries(to_stage, cx)
3464 }
3465
3466 pub fn unstage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3467 let to_unstage = self
3468 .cached_status()
3469 .filter(|entry| entry.status.staging().has_staged())
3470 .map(|entry| entry.repo_path.clone())
3471 .collect();
3472 self.unstage_entries(to_unstage, cx)
3473 }
3474
3475 pub fn commit(
3476 &mut self,
3477 message: SharedString,
3478 name_and_email: Option<(SharedString, SharedString)>,
3479 options: CommitOptions,
3480 _cx: &mut App,
3481 ) -> oneshot::Receiver<Result<()>> {
3482 let id = self.id;
3483
3484 self.send_job(Some("git commit".into()), move |git_repo, _cx| async move {
3485 match git_repo {
3486 RepositoryState::Local {
3487 backend,
3488 environment,
3489 ..
3490 } => {
3491 backend
3492 .commit(message, name_and_email, options, environment)
3493 .await
3494 }
3495 RepositoryState::Remote { project_id, client } => {
3496 let (name, email) = name_and_email.unzip();
3497 client
3498 .request(proto::Commit {
3499 project_id: project_id.0,
3500 repository_id: id.to_proto(),
3501 message: String::from(message),
3502 name: name.map(String::from),
3503 email: email.map(String::from),
3504 options: Some(proto::commit::CommitOptions {
3505 amend: options.amend,
3506 }),
3507 })
3508 .await
3509 .context("sending commit request")?;
3510
3511 Ok(())
3512 }
3513 }
3514 })
3515 }
3516
3517 pub fn fetch(
3518 &mut self,
3519 fetch_options: FetchOptions,
3520 askpass: AskPassDelegate,
3521 _cx: &mut App,
3522 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3523 let askpass_delegates = self.askpass_delegates.clone();
3524 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3525 let id = self.id;
3526
3527 self.send_job(Some("git fetch".into()), move |git_repo, cx| async move {
3528 match git_repo {
3529 RepositoryState::Local {
3530 backend,
3531 environment,
3532 ..
3533 } => backend.fetch(fetch_options, askpass, environment, cx).await,
3534 RepositoryState::Remote { project_id, client } => {
3535 askpass_delegates.lock().insert(askpass_id, askpass);
3536 let _defer = util::defer(|| {
3537 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3538 debug_assert!(askpass_delegate.is_some());
3539 });
3540
3541 let response = client
3542 .request(proto::Fetch {
3543 project_id: project_id.0,
3544 repository_id: id.to_proto(),
3545 askpass_id,
3546 remote: fetch_options.to_proto(),
3547 })
3548 .await
3549 .context("sending fetch request")?;
3550
3551 Ok(RemoteCommandOutput {
3552 stdout: response.stdout,
3553 stderr: response.stderr,
3554 })
3555 }
3556 }
3557 })
3558 }
3559
3560 pub fn push(
3561 &mut self,
3562 branch: SharedString,
3563 remote: SharedString,
3564 options: Option<PushOptions>,
3565 askpass: AskPassDelegate,
3566 cx: &mut Context<Self>,
3567 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3568 let askpass_delegates = self.askpass_delegates.clone();
3569 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3570 let id = self.id;
3571
3572 let args = options
3573 .map(|option| match option {
3574 PushOptions::SetUpstream => " --set-upstream",
3575 PushOptions::Force => " --force-with-lease",
3576 })
3577 .unwrap_or("");
3578
3579 let updates_tx = self
3580 .git_store()
3581 .and_then(|git_store| match &git_store.read(cx).state {
3582 GitStoreState::Local { downstream, .. } => downstream
3583 .as_ref()
3584 .map(|downstream| downstream.updates_tx.clone()),
3585 _ => None,
3586 });
3587
3588 let this = cx.weak_entity();
3589 self.send_job(
3590 Some(format!("git push {} {} {}", args, branch, remote).into()),
3591 move |git_repo, mut cx| async move {
3592 match git_repo {
3593 RepositoryState::Local {
3594 backend,
3595 environment,
3596 ..
3597 } => {
3598 let result = backend
3599 .push(
3600 branch.to_string(),
3601 remote.to_string(),
3602 options,
3603 askpass,
3604 environment.clone(),
3605 cx.clone(),
3606 )
3607 .await;
3608 if result.is_ok() {
3609 let branches = backend.branches().await?;
3610 let branch = branches.into_iter().find(|branch| branch.is_head);
3611 log::info!("head branch after scan is {branch:?}");
3612 let snapshot = this.update(&mut cx, |this, cx| {
3613 this.snapshot.branch = branch;
3614 let snapshot = this.snapshot.clone();
3615 cx.emit(RepositoryEvent::Updated {
3616 full_scan: false,
3617 new_instance: false,
3618 });
3619 snapshot
3620 })?;
3621 if let Some(updates_tx) = updates_tx {
3622 updates_tx
3623 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
3624 .ok();
3625 }
3626 }
3627 result
3628 }
3629 RepositoryState::Remote { project_id, client } => {
3630 askpass_delegates.lock().insert(askpass_id, askpass);
3631 let _defer = util::defer(|| {
3632 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3633 debug_assert!(askpass_delegate.is_some());
3634 });
3635 let response = client
3636 .request(proto::Push {
3637 project_id: project_id.0,
3638 repository_id: id.to_proto(),
3639 askpass_id,
3640 branch_name: branch.to_string(),
3641 remote_name: remote.to_string(),
3642 options: options.map(|options| match options {
3643 PushOptions::Force => proto::push::PushOptions::Force,
3644 PushOptions::SetUpstream => {
3645 proto::push::PushOptions::SetUpstream
3646 }
3647 }
3648 as i32),
3649 })
3650 .await
3651 .context("sending push request")?;
3652
3653 Ok(RemoteCommandOutput {
3654 stdout: response.stdout,
3655 stderr: response.stderr,
3656 })
3657 }
3658 }
3659 },
3660 )
3661 }
3662
3663 pub fn pull(
3664 &mut self,
3665 branch: SharedString,
3666 remote: SharedString,
3667 askpass: AskPassDelegate,
3668 _cx: &mut App,
3669 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3670 let askpass_delegates = self.askpass_delegates.clone();
3671 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3672 let id = self.id;
3673
3674 self.send_job(
3675 Some(format!("git pull {} {}", remote, branch).into()),
3676 move |git_repo, cx| async move {
3677 match git_repo {
3678 RepositoryState::Local {
3679 backend,
3680 environment,
3681 ..
3682 } => {
3683 backend
3684 .pull(
3685 branch.to_string(),
3686 remote.to_string(),
3687 askpass,
3688 environment.clone(),
3689 cx,
3690 )
3691 .await
3692 }
3693 RepositoryState::Remote { project_id, client } => {
3694 askpass_delegates.lock().insert(askpass_id, askpass);
3695 let _defer = util::defer(|| {
3696 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3697 debug_assert!(askpass_delegate.is_some());
3698 });
3699 let response = client
3700 .request(proto::Pull {
3701 project_id: project_id.0,
3702 repository_id: id.to_proto(),
3703 askpass_id,
3704 branch_name: branch.to_string(),
3705 remote_name: remote.to_string(),
3706 })
3707 .await
3708 .context("sending pull request")?;
3709
3710 Ok(RemoteCommandOutput {
3711 stdout: response.stdout,
3712 stderr: response.stderr,
3713 })
3714 }
3715 }
3716 },
3717 )
3718 }
3719
3720 fn spawn_set_index_text_job(
3721 &mut self,
3722 path: RepoPath,
3723 content: Option<String>,
3724 hunk_staging_operation_count: Option<usize>,
3725 cx: &mut Context<Self>,
3726 ) -> oneshot::Receiver<anyhow::Result<()>> {
3727 let id = self.id;
3728 let this = cx.weak_entity();
3729 let git_store = self.git_store.clone();
3730 self.send_keyed_job(
3731 Some(GitJobKey::WriteIndex(path.clone())),
3732 None,
3733 move |git_repo, mut cx| async move {
3734 log::debug!("start updating index text for buffer {}", path.display());
3735 match git_repo {
3736 RepositoryState::Local {
3737 backend,
3738 environment,
3739 ..
3740 } => {
3741 backend
3742 .set_index_text(path.clone(), content, environment.clone())
3743 .await?;
3744 }
3745 RepositoryState::Remote { project_id, client } => {
3746 client
3747 .request(proto::SetIndexText {
3748 project_id: project_id.0,
3749 repository_id: id.to_proto(),
3750 path: path.as_ref().to_proto(),
3751 text: content,
3752 })
3753 .await?;
3754 }
3755 }
3756 log::debug!("finish updating index text for buffer {}", path.display());
3757
3758 if let Some(hunk_staging_operation_count) = hunk_staging_operation_count {
3759 let project_path = this
3760 .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
3761 .ok()
3762 .flatten();
3763 git_store.update(&mut cx, |git_store, cx| {
3764 let buffer_id = git_store
3765 .buffer_store
3766 .read(cx)
3767 .get_by_path(&project_path?)?
3768 .read(cx)
3769 .remote_id();
3770 let diff_state = git_store.diffs.get(&buffer_id)?;
3771 diff_state.update(cx, |diff_state, _| {
3772 diff_state.hunk_staging_operation_count_as_of_write =
3773 hunk_staging_operation_count;
3774 });
3775 Some(())
3776 })?;
3777 }
3778 Ok(())
3779 },
3780 )
3781 }
3782
3783 pub fn get_remotes(
3784 &mut self,
3785 branch_name: Option<String>,
3786 ) -> oneshot::Receiver<Result<Vec<Remote>>> {
3787 let id = self.id;
3788 self.send_job(None, move |repo, _cx| async move {
3789 match repo {
3790 RepositoryState::Local { backend, .. } => backend.get_remotes(branch_name).await,
3791 RepositoryState::Remote { project_id, client } => {
3792 let response = client
3793 .request(proto::GetRemotes {
3794 project_id: project_id.0,
3795 repository_id: id.to_proto(),
3796 branch_name,
3797 })
3798 .await?;
3799
3800 let remotes = response
3801 .remotes
3802 .into_iter()
3803 .map(|remotes| git::repository::Remote {
3804 name: remotes.name.into(),
3805 })
3806 .collect();
3807
3808 Ok(remotes)
3809 }
3810 }
3811 })
3812 }
3813
3814 pub fn branches(&mut self) -> oneshot::Receiver<Result<Vec<Branch>>> {
3815 let id = self.id;
3816 self.send_job(None, move |repo, _| async move {
3817 match repo {
3818 RepositoryState::Local { backend, .. } => backend.branches().await,
3819 RepositoryState::Remote { project_id, client } => {
3820 let response = client
3821 .request(proto::GitGetBranches {
3822 project_id: project_id.0,
3823 repository_id: id.to_proto(),
3824 })
3825 .await?;
3826
3827 let branches = response
3828 .branches
3829 .into_iter()
3830 .map(|branch| proto_to_branch(&branch))
3831 .collect();
3832
3833 Ok(branches)
3834 }
3835 }
3836 })
3837 }
3838
3839 pub fn diff(&mut self, diff_type: DiffType, _cx: &App) -> oneshot::Receiver<Result<String>> {
3840 let id = self.id;
3841 self.send_job(None, move |repo, _cx| async move {
3842 match repo {
3843 RepositoryState::Local { backend, .. } => backend.diff(diff_type).await,
3844 RepositoryState::Remote { project_id, client } => {
3845 let response = client
3846 .request(proto::GitDiff {
3847 project_id: project_id.0,
3848 repository_id: id.to_proto(),
3849 diff_type: match diff_type {
3850 DiffType::HeadToIndex => {
3851 proto::git_diff::DiffType::HeadToIndex.into()
3852 }
3853 DiffType::HeadToWorktree => {
3854 proto::git_diff::DiffType::HeadToWorktree.into()
3855 }
3856 },
3857 })
3858 .await?;
3859
3860 Ok(response.diff)
3861 }
3862 }
3863 })
3864 }
3865
3866 pub fn create_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3867 let id = self.id;
3868 self.send_job(
3869 Some(format!("git switch -c {branch_name}").into()),
3870 move |repo, _cx| async move {
3871 match repo {
3872 RepositoryState::Local { backend, .. } => {
3873 backend.create_branch(branch_name).await
3874 }
3875 RepositoryState::Remote { project_id, client } => {
3876 client
3877 .request(proto::GitCreateBranch {
3878 project_id: project_id.0,
3879 repository_id: id.to_proto(),
3880 branch_name,
3881 })
3882 .await?;
3883
3884 Ok(())
3885 }
3886 }
3887 },
3888 )
3889 }
3890
3891 pub fn change_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3892 let id = self.id;
3893 self.send_job(
3894 Some(format!("git switch {branch_name}").into()),
3895 move |repo, _cx| async move {
3896 match repo {
3897 RepositoryState::Local { backend, .. } => {
3898 backend.change_branch(branch_name).await
3899 }
3900 RepositoryState::Remote { project_id, client } => {
3901 client
3902 .request(proto::GitChangeBranch {
3903 project_id: project_id.0,
3904 repository_id: id.to_proto(),
3905 branch_name,
3906 })
3907 .await?;
3908
3909 Ok(())
3910 }
3911 }
3912 },
3913 )
3914 }
3915
3916 pub fn check_for_pushed_commits(&mut self) -> oneshot::Receiver<Result<Vec<SharedString>>> {
3917 let id = self.id;
3918 self.send_job(None, move |repo, _cx| async move {
3919 match repo {
3920 RepositoryState::Local { backend, .. } => backend.check_for_pushed_commit().await,
3921 RepositoryState::Remote { project_id, client } => {
3922 let response = client
3923 .request(proto::CheckForPushedCommits {
3924 project_id: project_id.0,
3925 repository_id: id.to_proto(),
3926 })
3927 .await?;
3928
3929 let branches = response.pushed_to.into_iter().map(Into::into).collect();
3930
3931 Ok(branches)
3932 }
3933 }
3934 })
3935 }
3936
3937 pub fn checkpoint(&mut self) -> oneshot::Receiver<Result<GitRepositoryCheckpoint>> {
3938 self.send_job(None, |repo, _cx| async move {
3939 match repo {
3940 RepositoryState::Local { backend, .. } => backend.checkpoint().await,
3941 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
3942 }
3943 })
3944 }
3945
3946 pub fn restore_checkpoint(
3947 &mut self,
3948 checkpoint: GitRepositoryCheckpoint,
3949 ) -> oneshot::Receiver<Result<()>> {
3950 self.send_job(None, move |repo, _cx| async move {
3951 match repo {
3952 RepositoryState::Local { backend, .. } => {
3953 backend.restore_checkpoint(checkpoint).await
3954 }
3955 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
3956 }
3957 })
3958 }
3959
3960 pub(crate) fn apply_remote_update(
3961 &mut self,
3962 update: proto::UpdateRepository,
3963 is_new: bool,
3964 cx: &mut Context<Self>,
3965 ) -> Result<()> {
3966 let conflicted_paths = TreeSet::from_ordered_entries(
3967 update
3968 .current_merge_conflicts
3969 .into_iter()
3970 .map(|path| RepoPath(Path::new(&path).into())),
3971 );
3972 self.snapshot.branch = update.branch_summary.as_ref().map(proto_to_branch);
3973 self.snapshot.head_commit = update
3974 .head_commit_details
3975 .as_ref()
3976 .map(proto_to_commit_details);
3977
3978 self.snapshot.merge.conflicted_paths = conflicted_paths;
3979
3980 let edits = update
3981 .removed_statuses
3982 .into_iter()
3983 .map(|path| sum_tree::Edit::Remove(PathKey(FromProto::from_proto(path))))
3984 .chain(
3985 update
3986 .updated_statuses
3987 .into_iter()
3988 .filter_map(|updated_status| {
3989 Some(sum_tree::Edit::Insert(updated_status.try_into().log_err()?))
3990 }),
3991 )
3992 .collect::<Vec<_>>();
3993 self.snapshot.statuses_by_path.edit(edits, &());
3994 if update.is_last_update {
3995 self.snapshot.scan_id = update.scan_id;
3996 }
3997 cx.emit(RepositoryEvent::Updated {
3998 full_scan: true,
3999 new_instance: is_new,
4000 });
4001 Ok(())
4002 }
4003
4004 pub fn compare_checkpoints(
4005 &mut self,
4006 left: GitRepositoryCheckpoint,
4007 right: GitRepositoryCheckpoint,
4008 ) -> oneshot::Receiver<Result<bool>> {
4009 self.send_job(None, move |repo, _cx| async move {
4010 match repo {
4011 RepositoryState::Local { backend, .. } => {
4012 backend.compare_checkpoints(left, right).await
4013 }
4014 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
4015 }
4016 })
4017 }
4018
4019 pub fn diff_checkpoints(
4020 &mut self,
4021 base_checkpoint: GitRepositoryCheckpoint,
4022 target_checkpoint: GitRepositoryCheckpoint,
4023 ) -> oneshot::Receiver<Result<String>> {
4024 self.send_job(None, move |repo, _cx| async move {
4025 match repo {
4026 RepositoryState::Local { backend, .. } => {
4027 backend
4028 .diff_checkpoints(base_checkpoint, target_checkpoint)
4029 .await
4030 }
4031 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
4032 }
4033 })
4034 }
4035
4036 fn schedule_scan(
4037 &mut self,
4038 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4039 cx: &mut Context<Self>,
4040 ) {
4041 let this = cx.weak_entity();
4042 let _ = self.send_keyed_job(
4043 Some(GitJobKey::ReloadGitState),
4044 None,
4045 |state, mut cx| async move {
4046 log::debug!("run scheduled git status scan");
4047
4048 let Some(this) = this.upgrade() else {
4049 return Ok(());
4050 };
4051 let RepositoryState::Local { backend, .. } = state else {
4052 bail!("not a local repository")
4053 };
4054 let (snapshot, events) = this
4055 .read_with(&mut cx, |this, _| {
4056 compute_snapshot(
4057 this.id,
4058 this.work_directory_abs_path.clone(),
4059 this.snapshot.clone(),
4060 backend.clone(),
4061 )
4062 })?
4063 .await?;
4064 this.update(&mut cx, |this, cx| {
4065 this.snapshot = snapshot.clone();
4066 for event in events {
4067 cx.emit(event);
4068 }
4069 })?;
4070 if let Some(updates_tx) = updates_tx {
4071 updates_tx
4072 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
4073 .ok();
4074 }
4075 Ok(())
4076 },
4077 );
4078 }
4079
4080 fn spawn_local_git_worker(
4081 work_directory_abs_path: Arc<Path>,
4082 dot_git_abs_path: Arc<Path>,
4083 _repository_dir_abs_path: Arc<Path>,
4084 _common_dir_abs_path: Arc<Path>,
4085 project_environment: WeakEntity<ProjectEnvironment>,
4086 fs: Arc<dyn Fs>,
4087 cx: &mut Context<Self>,
4088 ) -> mpsc::UnboundedSender<GitJob> {
4089 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4090
4091 cx.spawn(async move |_, cx| {
4092 let environment = project_environment
4093 .upgrade()
4094 .context("missing project environment")?
4095 .update(cx, |project_environment, cx| {
4096 project_environment.get_directory_environment(work_directory_abs_path.clone(), cx)
4097 })?
4098 .await
4099 .unwrap_or_else(|| {
4100 log::error!("failed to get working directory environment for repository {work_directory_abs_path:?}");
4101 HashMap::default()
4102 });
4103 let backend = cx
4104 .background_spawn(async move {
4105 fs.open_repo(&dot_git_abs_path)
4106 .with_context(|| format!("opening repository at {dot_git_abs_path:?}"))
4107 })
4108 .await?;
4109
4110 if let Some(git_hosting_provider_registry) =
4111 cx.update(|cx| GitHostingProviderRegistry::try_global(cx))?
4112 {
4113 git_hosting_providers::register_additional_providers(
4114 git_hosting_provider_registry,
4115 backend.clone(),
4116 );
4117 }
4118
4119 let state = RepositoryState::Local {
4120 backend,
4121 environment: Arc::new(environment),
4122 };
4123 let mut jobs = VecDeque::new();
4124 loop {
4125 while let Ok(Some(next_job)) = job_rx.try_next() {
4126 jobs.push_back(next_job);
4127 }
4128
4129 if let Some(job) = jobs.pop_front() {
4130 if let Some(current_key) = &job.key {
4131 if jobs
4132 .iter()
4133 .any(|other_job| other_job.key.as_ref() == Some(current_key))
4134 {
4135 continue;
4136 }
4137 }
4138 (job.job)(state.clone(), cx).await;
4139 } else if let Some(job) = job_rx.next().await {
4140 jobs.push_back(job);
4141 } else {
4142 break;
4143 }
4144 }
4145 anyhow::Ok(())
4146 })
4147 .detach_and_log_err(cx);
4148
4149 job_tx
4150 }
4151
4152 fn spawn_remote_git_worker(
4153 project_id: ProjectId,
4154 client: AnyProtoClient,
4155 cx: &mut Context<Self>,
4156 ) -> mpsc::UnboundedSender<GitJob> {
4157 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
4158
4159 cx.spawn(async move |_, cx| {
4160 let state = RepositoryState::Remote { project_id, client };
4161 let mut jobs = VecDeque::new();
4162 loop {
4163 while let Ok(Some(next_job)) = job_rx.try_next() {
4164 jobs.push_back(next_job);
4165 }
4166
4167 if let Some(job) = jobs.pop_front() {
4168 if let Some(current_key) = &job.key {
4169 if jobs
4170 .iter()
4171 .any(|other_job| other_job.key.as_ref() == Some(current_key))
4172 {
4173 continue;
4174 }
4175 }
4176 (job.job)(state.clone(), cx).await;
4177 } else if let Some(job) = job_rx.next().await {
4178 jobs.push_back(job);
4179 } else {
4180 break;
4181 }
4182 }
4183 anyhow::Ok(())
4184 })
4185 .detach_and_log_err(cx);
4186
4187 job_tx
4188 }
4189
4190 fn load_staged_text(
4191 &mut self,
4192 buffer_id: BufferId,
4193 repo_path: RepoPath,
4194 cx: &App,
4195 ) -> Task<Result<Option<String>>> {
4196 let rx = self.send_job(None, move |state, _| async move {
4197 match state {
4198 RepositoryState::Local { backend, .. } => {
4199 anyhow::Ok(backend.load_index_text(repo_path).await)
4200 }
4201 RepositoryState::Remote { project_id, client } => {
4202 let response = client
4203 .request(proto::OpenUnstagedDiff {
4204 project_id: project_id.to_proto(),
4205 buffer_id: buffer_id.to_proto(),
4206 })
4207 .await?;
4208 Ok(response.staged_text)
4209 }
4210 }
4211 });
4212 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4213 }
4214
4215 fn load_committed_text(
4216 &mut self,
4217 buffer_id: BufferId,
4218 repo_path: RepoPath,
4219 cx: &App,
4220 ) -> Task<Result<DiffBasesChange>> {
4221 let rx = self.send_job(None, move |state, _| async move {
4222 match state {
4223 RepositoryState::Local { backend, .. } => {
4224 let committed_text = backend.load_committed_text(repo_path.clone()).await;
4225 let staged_text = backend.load_index_text(repo_path).await;
4226 let diff_bases_change = if committed_text == staged_text {
4227 DiffBasesChange::SetBoth(committed_text)
4228 } else {
4229 DiffBasesChange::SetEach {
4230 index: staged_text,
4231 head: committed_text,
4232 }
4233 };
4234 anyhow::Ok(diff_bases_change)
4235 }
4236 RepositoryState::Remote { project_id, client } => {
4237 use proto::open_uncommitted_diff_response::Mode;
4238
4239 let response = client
4240 .request(proto::OpenUncommittedDiff {
4241 project_id: project_id.to_proto(),
4242 buffer_id: buffer_id.to_proto(),
4243 })
4244 .await?;
4245 let mode = Mode::from_i32(response.mode).context("Invalid mode")?;
4246 let bases = match mode {
4247 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
4248 Mode::IndexAndHead => DiffBasesChange::SetEach {
4249 head: response.committed_text,
4250 index: response.staged_text,
4251 },
4252 };
4253 Ok(bases)
4254 }
4255 }
4256 });
4257
4258 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4259 }
4260
4261 fn paths_changed(
4262 &mut self,
4263 paths: Vec<RepoPath>,
4264 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4265 cx: &mut Context<Self>,
4266 ) {
4267 self.paths_needing_status_update.extend(paths);
4268
4269 let this = cx.weak_entity();
4270 let _ = self.send_keyed_job(
4271 Some(GitJobKey::RefreshStatuses),
4272 None,
4273 |state, mut cx| async move {
4274 let (prev_snapshot, mut changed_paths) = this.update(&mut cx, |this, _| {
4275 (
4276 this.snapshot.clone(),
4277 mem::take(&mut this.paths_needing_status_update),
4278 )
4279 })?;
4280 let RepositoryState::Local { backend, .. } = state else {
4281 bail!("not a local repository")
4282 };
4283
4284 let paths = changed_paths.iter().cloned().collect::<Vec<_>>();
4285 let statuses = backend.status(&paths).await?;
4286
4287 let changed_path_statuses = cx
4288 .background_spawn(async move {
4289 let mut changed_path_statuses = Vec::new();
4290 let prev_statuses = prev_snapshot.statuses_by_path.clone();
4291 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4292
4293 for (repo_path, status) in &*statuses.entries {
4294 changed_paths.remove(repo_path);
4295 if cursor.seek_forward(&PathTarget::Path(repo_path), Bias::Left, &()) {
4296 if cursor.item().is_some_and(|entry| entry.status == *status) {
4297 continue;
4298 }
4299 }
4300
4301 changed_path_statuses.push(Edit::Insert(StatusEntry {
4302 repo_path: repo_path.clone(),
4303 status: *status,
4304 }));
4305 }
4306 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4307 for path in changed_paths.into_iter() {
4308 if cursor.seek_forward(&PathTarget::Path(&path), Bias::Left, &()) {
4309 changed_path_statuses.push(Edit::Remove(PathKey(path.0)));
4310 }
4311 }
4312 changed_path_statuses
4313 })
4314 .await;
4315
4316 this.update(&mut cx, |this, cx| {
4317 if !changed_path_statuses.is_empty() {
4318 this.snapshot
4319 .statuses_by_path
4320 .edit(changed_path_statuses, &());
4321 this.snapshot.scan_id += 1;
4322 if let Some(updates_tx) = updates_tx {
4323 updates_tx
4324 .unbounded_send(DownstreamUpdate::UpdateRepository(
4325 this.snapshot.clone(),
4326 ))
4327 .ok();
4328 }
4329 }
4330 cx.emit(RepositoryEvent::Updated {
4331 full_scan: false,
4332 new_instance: false,
4333 });
4334 })
4335 },
4336 );
4337 }
4338
4339 /// currently running git command and when it started
4340 pub fn current_job(&self) -> Option<JobInfo> {
4341 self.active_jobs.values().next().cloned()
4342 }
4343
4344 pub fn barrier(&mut self) -> oneshot::Receiver<()> {
4345 self.send_job(None, |_, _| async {})
4346 }
4347}
4348
4349fn get_permalink_in_rust_registry_src(
4350 provider_registry: Arc<GitHostingProviderRegistry>,
4351 path: PathBuf,
4352 selection: Range<u32>,
4353) -> Result<url::Url> {
4354 #[derive(Deserialize)]
4355 struct CargoVcsGit {
4356 sha1: String,
4357 }
4358
4359 #[derive(Deserialize)]
4360 struct CargoVcsInfo {
4361 git: CargoVcsGit,
4362 path_in_vcs: String,
4363 }
4364
4365 #[derive(Deserialize)]
4366 struct CargoPackage {
4367 repository: String,
4368 }
4369
4370 #[derive(Deserialize)]
4371 struct CargoToml {
4372 package: CargoPackage,
4373 }
4374
4375 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
4376 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
4377 Some((dir, json))
4378 }) else {
4379 bail!("No .cargo_vcs_info.json found in parent directories")
4380 };
4381 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
4382 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
4383 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
4384 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
4385 .context("parsing package.repository field of manifest")?;
4386 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
4387 let permalink = provider.build_permalink(
4388 remote,
4389 BuildPermalinkParams {
4390 sha: &cargo_vcs_info.git.sha1,
4391 path: &path.to_string_lossy(),
4392 selection: Some(selection),
4393 },
4394 );
4395 Ok(permalink)
4396}
4397
4398fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
4399 let Some(blame) = blame else {
4400 return proto::BlameBufferResponse {
4401 blame_response: None,
4402 };
4403 };
4404
4405 let entries = blame
4406 .entries
4407 .into_iter()
4408 .map(|entry| proto::BlameEntry {
4409 sha: entry.sha.as_bytes().into(),
4410 start_line: entry.range.start,
4411 end_line: entry.range.end,
4412 original_line_number: entry.original_line_number,
4413 author: entry.author.clone(),
4414 author_mail: entry.author_mail.clone(),
4415 author_time: entry.author_time,
4416 author_tz: entry.author_tz.clone(),
4417 committer: entry.committer_name.clone(),
4418 committer_mail: entry.committer_email.clone(),
4419 committer_time: entry.committer_time,
4420 committer_tz: entry.committer_tz.clone(),
4421 summary: entry.summary.clone(),
4422 previous: entry.previous.clone(),
4423 filename: entry.filename.clone(),
4424 })
4425 .collect::<Vec<_>>();
4426
4427 let messages = blame
4428 .messages
4429 .into_iter()
4430 .map(|(oid, message)| proto::CommitMessage {
4431 oid: oid.as_bytes().into(),
4432 message,
4433 })
4434 .collect::<Vec<_>>();
4435
4436 proto::BlameBufferResponse {
4437 blame_response: Some(proto::blame_buffer_response::BlameResponse {
4438 entries,
4439 messages,
4440 remote_url: blame.remote_url,
4441 }),
4442 }
4443}
4444
4445fn deserialize_blame_buffer_response(
4446 response: proto::BlameBufferResponse,
4447) -> Option<git::blame::Blame> {
4448 let response = response.blame_response?;
4449 let entries = response
4450 .entries
4451 .into_iter()
4452 .filter_map(|entry| {
4453 Some(git::blame::BlameEntry {
4454 sha: git::Oid::from_bytes(&entry.sha).ok()?,
4455 range: entry.start_line..entry.end_line,
4456 original_line_number: entry.original_line_number,
4457 committer_name: entry.committer,
4458 committer_time: entry.committer_time,
4459 committer_tz: entry.committer_tz,
4460 committer_email: entry.committer_mail,
4461 author: entry.author,
4462 author_mail: entry.author_mail,
4463 author_time: entry.author_time,
4464 author_tz: entry.author_tz,
4465 summary: entry.summary,
4466 previous: entry.previous,
4467 filename: entry.filename,
4468 })
4469 })
4470 .collect::<Vec<_>>();
4471
4472 let messages = response
4473 .messages
4474 .into_iter()
4475 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
4476 .collect::<HashMap<_, _>>();
4477
4478 Some(Blame {
4479 entries,
4480 messages,
4481 remote_url: response.remote_url,
4482 })
4483}
4484
4485fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
4486 proto::Branch {
4487 is_head: branch.is_head,
4488 ref_name: branch.ref_name.to_string(),
4489 unix_timestamp: branch
4490 .most_recent_commit
4491 .as_ref()
4492 .map(|commit| commit.commit_timestamp as u64),
4493 upstream: branch.upstream.as_ref().map(|upstream| proto::GitUpstream {
4494 ref_name: upstream.ref_name.to_string(),
4495 tracking: upstream
4496 .tracking
4497 .status()
4498 .map(|upstream| proto::UpstreamTracking {
4499 ahead: upstream.ahead as u64,
4500 behind: upstream.behind as u64,
4501 }),
4502 }),
4503 most_recent_commit: branch
4504 .most_recent_commit
4505 .as_ref()
4506 .map(|commit| proto::CommitSummary {
4507 sha: commit.sha.to_string(),
4508 subject: commit.subject.to_string(),
4509 commit_timestamp: commit.commit_timestamp,
4510 }),
4511 }
4512}
4513
4514fn proto_to_branch(proto: &proto::Branch) -> git::repository::Branch {
4515 git::repository::Branch {
4516 is_head: proto.is_head,
4517 ref_name: proto.ref_name.clone().into(),
4518 upstream: proto
4519 .upstream
4520 .as_ref()
4521 .map(|upstream| git::repository::Upstream {
4522 ref_name: upstream.ref_name.to_string().into(),
4523 tracking: upstream
4524 .tracking
4525 .as_ref()
4526 .map(|tracking| {
4527 git::repository::UpstreamTracking::Tracked(UpstreamTrackingStatus {
4528 ahead: tracking.ahead as u32,
4529 behind: tracking.behind as u32,
4530 })
4531 })
4532 .unwrap_or(git::repository::UpstreamTracking::Gone),
4533 }),
4534 most_recent_commit: proto.most_recent_commit.as_ref().map(|commit| {
4535 git::repository::CommitSummary {
4536 sha: commit.sha.to_string().into(),
4537 subject: commit.subject.to_string().into(),
4538 commit_timestamp: commit.commit_timestamp,
4539 has_parent: true,
4540 }
4541 }),
4542 }
4543}
4544
4545fn commit_details_to_proto(commit: &CommitDetails) -> proto::GitCommitDetails {
4546 proto::GitCommitDetails {
4547 sha: commit.sha.to_string(),
4548 message: commit.message.to_string(),
4549 commit_timestamp: commit.commit_timestamp,
4550 author_email: commit.author_email.to_string(),
4551 author_name: commit.author_name.to_string(),
4552 }
4553}
4554
4555fn proto_to_commit_details(proto: &proto::GitCommitDetails) -> CommitDetails {
4556 CommitDetails {
4557 sha: proto.sha.clone().into(),
4558 message: proto.message.clone().into(),
4559 commit_timestamp: proto.commit_timestamp,
4560 author_email: proto.author_email.clone().into(),
4561 author_name: proto.author_name.clone().into(),
4562 }
4563}
4564
4565async fn compute_snapshot(
4566 id: RepositoryId,
4567 work_directory_abs_path: Arc<Path>,
4568 prev_snapshot: RepositorySnapshot,
4569 backend: Arc<dyn GitRepository>,
4570) -> Result<(RepositorySnapshot, Vec<RepositoryEvent>)> {
4571 let mut events = Vec::new();
4572 let branches = backend.branches().await?;
4573 let branch = branches.into_iter().find(|branch| branch.is_head);
4574 let statuses = backend
4575 .status(std::slice::from_ref(&WORK_DIRECTORY_REPO_PATH))
4576 .await?;
4577 let statuses_by_path = SumTree::from_iter(
4578 statuses
4579 .entries
4580 .iter()
4581 .map(|(repo_path, status)| StatusEntry {
4582 repo_path: repo_path.clone(),
4583 status: *status,
4584 }),
4585 &(),
4586 );
4587 let (merge_details, merge_heads_changed) =
4588 MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?;
4589 log::debug!("new merge details (changed={merge_heads_changed:?}): {merge_details:?}");
4590
4591 if merge_heads_changed
4592 || branch != prev_snapshot.branch
4593 || statuses_by_path != prev_snapshot.statuses_by_path
4594 {
4595 events.push(RepositoryEvent::Updated {
4596 full_scan: true,
4597 new_instance: false,
4598 });
4599 }
4600
4601 // Cache merge conflict paths so they don't change from staging/unstaging,
4602 // until the merge heads change (at commit time, etc.).
4603 if merge_heads_changed {
4604 events.push(RepositoryEvent::MergeHeadsChanged);
4605 }
4606
4607 // Useful when branch is None in detached head state
4608 let head_commit = match backend.head_sha().await {
4609 Some(head_sha) => backend.show(head_sha).await.log_err(),
4610 None => None,
4611 };
4612
4613 let snapshot = RepositorySnapshot {
4614 id,
4615 statuses_by_path,
4616 work_directory_abs_path,
4617 scan_id: prev_snapshot.scan_id + 1,
4618 branch,
4619 head_commit,
4620 merge: merge_details,
4621 };
4622
4623 Ok((snapshot, events))
4624}
4625
4626fn status_from_proto(
4627 simple_status: i32,
4628 status: Option<proto::GitFileStatus>,
4629) -> anyhow::Result<FileStatus> {
4630 use proto::git_file_status::Variant;
4631
4632 let Some(variant) = status.and_then(|status| status.variant) else {
4633 let code = proto::GitStatus::from_i32(simple_status)
4634 .with_context(|| format!("Invalid git status code: {simple_status}"))?;
4635 let result = match code {
4636 proto::GitStatus::Added => TrackedStatus {
4637 worktree_status: StatusCode::Added,
4638 index_status: StatusCode::Unmodified,
4639 }
4640 .into(),
4641 proto::GitStatus::Modified => TrackedStatus {
4642 worktree_status: StatusCode::Modified,
4643 index_status: StatusCode::Unmodified,
4644 }
4645 .into(),
4646 proto::GitStatus::Conflict => UnmergedStatus {
4647 first_head: UnmergedStatusCode::Updated,
4648 second_head: UnmergedStatusCode::Updated,
4649 }
4650 .into(),
4651 proto::GitStatus::Deleted => TrackedStatus {
4652 worktree_status: StatusCode::Deleted,
4653 index_status: StatusCode::Unmodified,
4654 }
4655 .into(),
4656 _ => anyhow::bail!("Invalid code for simple status: {simple_status}"),
4657 };
4658 return Ok(result);
4659 };
4660
4661 let result = match variant {
4662 Variant::Untracked(_) => FileStatus::Untracked,
4663 Variant::Ignored(_) => FileStatus::Ignored,
4664 Variant::Unmerged(unmerged) => {
4665 let [first_head, second_head] =
4666 [unmerged.first_head, unmerged.second_head].map(|head| {
4667 let code = proto::GitStatus::from_i32(head)
4668 .with_context(|| format!("Invalid git status code: {head}"))?;
4669 let result = match code {
4670 proto::GitStatus::Added => UnmergedStatusCode::Added,
4671 proto::GitStatus::Updated => UnmergedStatusCode::Updated,
4672 proto::GitStatus::Deleted => UnmergedStatusCode::Deleted,
4673 _ => anyhow::bail!("Invalid code for unmerged status: {code:?}"),
4674 };
4675 Ok(result)
4676 });
4677 let [first_head, second_head] = [first_head?, second_head?];
4678 UnmergedStatus {
4679 first_head,
4680 second_head,
4681 }
4682 .into()
4683 }
4684 Variant::Tracked(tracked) => {
4685 let [index_status, worktree_status] = [tracked.index_status, tracked.worktree_status]
4686 .map(|status| {
4687 let code = proto::GitStatus::from_i32(status)
4688 .with_context(|| format!("Invalid git status code: {status}"))?;
4689 let result = match code {
4690 proto::GitStatus::Modified => StatusCode::Modified,
4691 proto::GitStatus::TypeChanged => StatusCode::TypeChanged,
4692 proto::GitStatus::Added => StatusCode::Added,
4693 proto::GitStatus::Deleted => StatusCode::Deleted,
4694 proto::GitStatus::Renamed => StatusCode::Renamed,
4695 proto::GitStatus::Copied => StatusCode::Copied,
4696 proto::GitStatus::Unmodified => StatusCode::Unmodified,
4697 _ => anyhow::bail!("Invalid code for tracked status: {code:?}"),
4698 };
4699 Ok(result)
4700 });
4701 let [index_status, worktree_status] = [index_status?, worktree_status?];
4702 TrackedStatus {
4703 index_status,
4704 worktree_status,
4705 }
4706 .into()
4707 }
4708 };
4709 Ok(result)
4710}
4711
4712fn status_to_proto(status: FileStatus) -> proto::GitFileStatus {
4713 use proto::git_file_status::{Tracked, Unmerged, Variant};
4714
4715 let variant = match status {
4716 FileStatus::Untracked => Variant::Untracked(Default::default()),
4717 FileStatus::Ignored => Variant::Ignored(Default::default()),
4718 FileStatus::Unmerged(UnmergedStatus {
4719 first_head,
4720 second_head,
4721 }) => Variant::Unmerged(Unmerged {
4722 first_head: unmerged_status_to_proto(first_head),
4723 second_head: unmerged_status_to_proto(second_head),
4724 }),
4725 FileStatus::Tracked(TrackedStatus {
4726 index_status,
4727 worktree_status,
4728 }) => Variant::Tracked(Tracked {
4729 index_status: tracked_status_to_proto(index_status),
4730 worktree_status: tracked_status_to_proto(worktree_status),
4731 }),
4732 };
4733 proto::GitFileStatus {
4734 variant: Some(variant),
4735 }
4736}
4737
4738fn unmerged_status_to_proto(code: UnmergedStatusCode) -> i32 {
4739 match code {
4740 UnmergedStatusCode::Added => proto::GitStatus::Added as _,
4741 UnmergedStatusCode::Deleted => proto::GitStatus::Deleted as _,
4742 UnmergedStatusCode::Updated => proto::GitStatus::Updated as _,
4743 }
4744}
4745
4746fn tracked_status_to_proto(code: StatusCode) -> i32 {
4747 match code {
4748 StatusCode::Added => proto::GitStatus::Added as _,
4749 StatusCode::Deleted => proto::GitStatus::Deleted as _,
4750 StatusCode::Modified => proto::GitStatus::Modified as _,
4751 StatusCode::Renamed => proto::GitStatus::Renamed as _,
4752 StatusCode::TypeChanged => proto::GitStatus::TypeChanged as _,
4753 StatusCode::Copied => proto::GitStatus::Copied as _,
4754 StatusCode::Unmodified => proto::GitStatus::Unmodified as _,
4755 }
4756}