1pub mod git_traversal;
2
3use crate::{
4 ProjectEnvironment, ProjectItem, ProjectPath,
5 buffer_store::{BufferStore, BufferStoreEvent},
6 worktree_store::{WorktreeStore, WorktreeStoreEvent},
7};
8use anyhow::{Context as _, Result, anyhow, bail};
9use askpass::AskPassDelegate;
10use buffer_diff::{BufferDiff, BufferDiffEvent};
11use client::ProjectId;
12use collections::HashMap;
13use fs::Fs;
14use futures::{
15 FutureExt as _, StreamExt as _,
16 channel::{mpsc, oneshot},
17 future::{self, Shared},
18};
19use git::{
20 BuildPermalinkParams, GitHostingProviderRegistry, WORK_DIRECTORY_REPO_PATH,
21 blame::Blame,
22 parse_git_remote_url,
23 repository::{
24 Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, DiffType, GitRepository,
25 GitRepositoryCheckpoint, PushOptions, Remote, RemoteCommandOutput, RepoPath, ResetMode,
26 UpstreamTrackingStatus,
27 },
28 status::{
29 FileStatus, GitSummary, StatusCode, TrackedStatus, UnmergedStatus, UnmergedStatusCode,
30 },
31};
32use gpui::{
33 App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
34 WeakEntity,
35};
36use language::{
37 Buffer, BufferEvent, Language, LanguageRegistry,
38 proto::{deserialize_version, serialize_version},
39};
40use parking_lot::Mutex;
41use postage::stream::Stream as _;
42use rpc::{
43 AnyProtoClient, TypedEnvelope,
44 proto::{self, FromProto, SSH_PROJECT_ID, ToProto, git_reset, split_repository_update},
45};
46use serde::Deserialize;
47use std::{
48 cmp::Ordering,
49 collections::{BTreeSet, VecDeque},
50 future::Future,
51 mem,
52 ops::Range,
53 path::{Path, PathBuf},
54 sync::{
55 Arc,
56 atomic::{self, AtomicU64},
57 },
58 time::Instant,
59};
60use sum_tree::{Edit, SumTree, TreeSet};
61use text::{Bias, BufferId};
62use util::{ResultExt, debug_panic, post_inc};
63use worktree::{
64 File, PathKey, PathProgress, PathSummary, PathTarget, UpdatedGitRepositoriesSet,
65 UpdatedGitRepository, Worktree,
66};
67
68pub struct GitStore {
69 state: GitStoreState,
70 buffer_store: Entity<BufferStore>,
71 worktree_store: Entity<WorktreeStore>,
72 repositories: HashMap<RepositoryId, Entity<Repository>>,
73 active_repo_id: Option<RepositoryId>,
74 #[allow(clippy::type_complexity)]
75 loading_diffs:
76 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
77 diffs: HashMap<BufferId, Entity<BufferDiffState>>,
78 shared_diffs: HashMap<proto::PeerId, HashMap<BufferId, SharedDiffs>>,
79 _subscriptions: Vec<Subscription>,
80}
81
82#[derive(Default)]
83struct SharedDiffs {
84 unstaged: Option<Entity<BufferDiff>>,
85 uncommitted: Option<Entity<BufferDiff>>,
86}
87
88struct BufferDiffState {
89 unstaged_diff: Option<WeakEntity<BufferDiff>>,
90 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
91 recalculate_diff_task: Option<Task<Result<()>>>,
92 language: Option<Arc<Language>>,
93 language_registry: Option<Arc<LanguageRegistry>>,
94 recalculating_tx: postage::watch::Sender<bool>,
95
96 /// These operation counts are used to ensure that head and index text
97 /// values read from the git repository are up-to-date with any hunk staging
98 /// operations that have been performed on the BufferDiff.
99 ///
100 /// The operation count is incremented immediately when the user initiates a
101 /// hunk stage/unstage operation. Then, upon finishing writing the new index
102 /// text do disk, the `operation count as of write` is updated to reflect
103 /// the operation count that prompted the write.
104 hunk_staging_operation_count: usize,
105 hunk_staging_operation_count_as_of_write: usize,
106
107 head_text: Option<Arc<String>>,
108 index_text: Option<Arc<String>>,
109 head_changed: bool,
110 index_changed: bool,
111 language_changed: bool,
112}
113
114#[derive(Clone, Debug)]
115enum DiffBasesChange {
116 SetIndex(Option<String>),
117 SetHead(Option<String>),
118 SetEach {
119 index: Option<String>,
120 head: Option<String>,
121 },
122 SetBoth(Option<String>),
123}
124
125#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
126enum DiffKind {
127 Unstaged,
128 Uncommitted,
129}
130
131enum GitStoreState {
132 Local {
133 next_repository_id: Arc<AtomicU64>,
134 downstream: Option<LocalDownstreamState>,
135 project_environment: Entity<ProjectEnvironment>,
136 fs: Arc<dyn Fs>,
137 },
138 Ssh {
139 upstream_client: AnyProtoClient,
140 upstream_project_id: ProjectId,
141 downstream: Option<(AnyProtoClient, ProjectId)>,
142 },
143 Remote {
144 upstream_client: AnyProtoClient,
145 upstream_project_id: ProjectId,
146 },
147}
148
149enum DownstreamUpdate {
150 UpdateRepository(RepositorySnapshot),
151 RemoveRepository(RepositoryId),
152}
153
154struct LocalDownstreamState {
155 client: AnyProtoClient,
156 project_id: ProjectId,
157 updates_tx: mpsc::UnboundedSender<DownstreamUpdate>,
158 _task: Task<Result<()>>,
159}
160
161#[derive(Clone)]
162pub struct GitStoreCheckpoint {
163 checkpoints_by_work_dir_abs_path: HashMap<Arc<Path>, GitRepositoryCheckpoint>,
164}
165
166#[derive(Clone, Debug, PartialEq, Eq)]
167pub struct StatusEntry {
168 pub repo_path: RepoPath,
169 pub status: FileStatus,
170}
171
172impl StatusEntry {
173 fn to_proto(&self) -> proto::StatusEntry {
174 let simple_status = match self.status {
175 FileStatus::Ignored | FileStatus::Untracked => proto::GitStatus::Added as i32,
176 FileStatus::Unmerged { .. } => proto::GitStatus::Conflict as i32,
177 FileStatus::Tracked(TrackedStatus {
178 index_status,
179 worktree_status,
180 }) => tracked_status_to_proto(if worktree_status != StatusCode::Unmodified {
181 worktree_status
182 } else {
183 index_status
184 }),
185 };
186
187 proto::StatusEntry {
188 repo_path: self.repo_path.as_ref().to_proto(),
189 simple_status,
190 status: Some(status_to_proto(self.status)),
191 }
192 }
193}
194
195impl TryFrom<proto::StatusEntry> for StatusEntry {
196 type Error = anyhow::Error;
197
198 fn try_from(value: proto::StatusEntry) -> Result<Self, Self::Error> {
199 let repo_path = RepoPath(Arc::<Path>::from_proto(value.repo_path));
200 let status = status_from_proto(value.simple_status, value.status)?;
201 Ok(Self { repo_path, status })
202 }
203}
204
205impl sum_tree::Item for StatusEntry {
206 type Summary = PathSummary<GitSummary>;
207
208 fn summary(&self, _: &<Self::Summary as sum_tree::Summary>::Context) -> Self::Summary {
209 PathSummary {
210 max_path: self.repo_path.0.clone(),
211 item_summary: self.status.summary(),
212 }
213 }
214}
215
216impl sum_tree::KeyedItem for StatusEntry {
217 type Key = PathKey;
218
219 fn key(&self) -> Self::Key {
220 PathKey(self.repo_path.0.clone())
221 }
222}
223
224#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
225pub struct RepositoryId(pub u64);
226
227#[derive(Clone, Debug, PartialEq, Eq)]
228pub struct RepositorySnapshot {
229 pub id: RepositoryId,
230 pub merge_message: Option<SharedString>,
231 pub statuses_by_path: SumTree<StatusEntry>,
232 pub work_directory_abs_path: Arc<Path>,
233 pub branch: Option<Branch>,
234 pub merge_conflicts: TreeSet<RepoPath>,
235 pub merge_head_shas: Vec<SharedString>,
236 pub scan_id: u64,
237}
238
239type JobId = u64;
240
241#[derive(Clone, Debug, PartialEq, Eq)]
242pub struct JobInfo {
243 pub start: Instant,
244 pub message: SharedString,
245}
246
247pub struct Repository {
248 this: WeakEntity<Self>,
249 snapshot: RepositorySnapshot,
250 commit_message_buffer: Option<Entity<Buffer>>,
251 git_store: WeakEntity<GitStore>,
252 // For a local repository, holds paths that have had worktree events since the last status scan completed,
253 // and that should be examined during the next status scan.
254 paths_needing_status_update: BTreeSet<RepoPath>,
255 job_sender: mpsc::UnboundedSender<GitJob>,
256 active_jobs: HashMap<JobId, JobInfo>,
257 job_id: JobId,
258 askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
259 latest_askpass_id: u64,
260}
261
262impl std::ops::Deref for Repository {
263 type Target = RepositorySnapshot;
264
265 fn deref(&self) -> &Self::Target {
266 &self.snapshot
267 }
268}
269
270#[derive(Clone)]
271pub enum RepositoryState {
272 Local {
273 backend: Arc<dyn GitRepository>,
274 environment: Arc<HashMap<String, String>>,
275 },
276 Remote {
277 project_id: ProjectId,
278 client: AnyProtoClient,
279 },
280}
281
282#[derive(Clone, Debug)]
283pub enum RepositoryEvent {
284 Updated { full_scan: bool },
285 MergeHeadsChanged,
286}
287
288#[derive(Clone, Debug)]
289pub struct JobsUpdated;
290
291#[derive(Debug)]
292pub enum GitStoreEvent {
293 ActiveRepositoryChanged(Option<RepositoryId>),
294 RepositoryUpdated(RepositoryId, RepositoryEvent, bool),
295 RepositoryAdded(RepositoryId),
296 RepositoryRemoved(RepositoryId),
297 IndexWriteError(anyhow::Error),
298 JobsUpdated,
299}
300
301impl EventEmitter<RepositoryEvent> for Repository {}
302impl EventEmitter<JobsUpdated> for Repository {}
303impl EventEmitter<GitStoreEvent> for GitStore {}
304
305pub struct GitJob {
306 job: Box<dyn FnOnce(RepositoryState, &mut AsyncApp) -> Task<()>>,
307 key: Option<GitJobKey>,
308}
309
310#[derive(PartialEq, Eq)]
311enum GitJobKey {
312 WriteIndex(RepoPath),
313 ReloadBufferDiffBases,
314 RefreshStatuses,
315 ReloadGitState,
316}
317
318impl GitStore {
319 pub fn local(
320 worktree_store: &Entity<WorktreeStore>,
321 buffer_store: Entity<BufferStore>,
322 environment: Entity<ProjectEnvironment>,
323 fs: Arc<dyn Fs>,
324 cx: &mut Context<Self>,
325 ) -> Self {
326 Self::new(
327 worktree_store.clone(),
328 buffer_store,
329 GitStoreState::Local {
330 next_repository_id: Arc::new(AtomicU64::new(1)),
331 downstream: None,
332 project_environment: environment,
333 fs,
334 },
335 cx,
336 )
337 }
338
339 pub fn remote(
340 worktree_store: &Entity<WorktreeStore>,
341 buffer_store: Entity<BufferStore>,
342 upstream_client: AnyProtoClient,
343 project_id: ProjectId,
344 cx: &mut Context<Self>,
345 ) -> Self {
346 Self::new(
347 worktree_store.clone(),
348 buffer_store,
349 GitStoreState::Remote {
350 upstream_client,
351 upstream_project_id: project_id,
352 },
353 cx,
354 )
355 }
356
357 pub fn ssh(
358 worktree_store: &Entity<WorktreeStore>,
359 buffer_store: Entity<BufferStore>,
360 upstream_client: AnyProtoClient,
361 cx: &mut Context<Self>,
362 ) -> Self {
363 Self::new(
364 worktree_store.clone(),
365 buffer_store,
366 GitStoreState::Ssh {
367 upstream_client,
368 upstream_project_id: ProjectId(SSH_PROJECT_ID),
369 downstream: None,
370 },
371 cx,
372 )
373 }
374
375 fn new(
376 worktree_store: Entity<WorktreeStore>,
377 buffer_store: Entity<BufferStore>,
378 state: GitStoreState,
379 cx: &mut Context<Self>,
380 ) -> Self {
381 let _subscriptions = vec![
382 cx.subscribe(&worktree_store, Self::on_worktree_store_event),
383 cx.subscribe(&buffer_store, Self::on_buffer_store_event),
384 ];
385
386 GitStore {
387 state,
388 buffer_store,
389 worktree_store,
390 repositories: HashMap::default(),
391 active_repo_id: None,
392 _subscriptions,
393 loading_diffs: HashMap::default(),
394 shared_diffs: HashMap::default(),
395 diffs: HashMap::default(),
396 }
397 }
398
399 pub fn init(client: &AnyProtoClient) {
400 client.add_entity_request_handler(Self::handle_get_remotes);
401 client.add_entity_request_handler(Self::handle_get_branches);
402 client.add_entity_request_handler(Self::handle_change_branch);
403 client.add_entity_request_handler(Self::handle_create_branch);
404 client.add_entity_request_handler(Self::handle_git_init);
405 client.add_entity_request_handler(Self::handle_push);
406 client.add_entity_request_handler(Self::handle_pull);
407 client.add_entity_request_handler(Self::handle_fetch);
408 client.add_entity_request_handler(Self::handle_stage);
409 client.add_entity_request_handler(Self::handle_unstage);
410 client.add_entity_request_handler(Self::handle_commit);
411 client.add_entity_request_handler(Self::handle_reset);
412 client.add_entity_request_handler(Self::handle_show);
413 client.add_entity_request_handler(Self::handle_load_commit_diff);
414 client.add_entity_request_handler(Self::handle_checkout_files);
415 client.add_entity_request_handler(Self::handle_open_commit_message_buffer);
416 client.add_entity_request_handler(Self::handle_set_index_text);
417 client.add_entity_request_handler(Self::handle_askpass);
418 client.add_entity_request_handler(Self::handle_check_for_pushed_commits);
419 client.add_entity_request_handler(Self::handle_git_diff);
420 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
421 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
422 client.add_entity_message_handler(Self::handle_update_diff_bases);
423 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
424 client.add_entity_request_handler(Self::handle_blame_buffer);
425 client.add_entity_message_handler(Self::handle_update_repository);
426 client.add_entity_message_handler(Self::handle_remove_repository);
427 }
428
429 pub fn is_local(&self) -> bool {
430 matches!(self.state, GitStoreState::Local { .. })
431 }
432
433 pub fn shared(&mut self, project_id: u64, client: AnyProtoClient, cx: &mut Context<Self>) {
434 match &mut self.state {
435 GitStoreState::Ssh {
436 downstream: downstream_client,
437 ..
438 } => {
439 for repo in self.repositories.values() {
440 let update = repo.read(cx).snapshot.initial_update(project_id);
441 for update in split_repository_update(update) {
442 client.send(update).log_err();
443 }
444 }
445 *downstream_client = Some((client, ProjectId(project_id)));
446 }
447 GitStoreState::Local {
448 downstream: downstream_client,
449 ..
450 } => {
451 let mut snapshots = HashMap::default();
452 let (updates_tx, mut updates_rx) = mpsc::unbounded();
453 for repo in self.repositories.values() {
454 updates_tx
455 .unbounded_send(DownstreamUpdate::UpdateRepository(
456 repo.read(cx).snapshot.clone(),
457 ))
458 .ok();
459 }
460 *downstream_client = Some(LocalDownstreamState {
461 client: client.clone(),
462 project_id: ProjectId(project_id),
463 updates_tx,
464 _task: cx.spawn(async move |this, cx| {
465 cx.background_spawn(async move {
466 while let Some(update) = updates_rx.next().await {
467 match update {
468 DownstreamUpdate::UpdateRepository(snapshot) => {
469 if let Some(old_snapshot) = snapshots.get_mut(&snapshot.id)
470 {
471 let update =
472 snapshot.build_update(old_snapshot, project_id);
473 *old_snapshot = snapshot;
474 for update in split_repository_update(update) {
475 client.send(update)?;
476 }
477 } else {
478 let update = snapshot.initial_update(project_id);
479 for update in split_repository_update(update) {
480 client.send(update)?;
481 }
482 snapshots.insert(snapshot.id, snapshot);
483 }
484 }
485 DownstreamUpdate::RemoveRepository(id) => {
486 client.send(proto::RemoveRepository {
487 project_id,
488 id: id.to_proto(),
489 })?;
490 }
491 }
492 }
493 anyhow::Ok(())
494 })
495 .await
496 .ok();
497 this.update(cx, |this, _| {
498 if let GitStoreState::Local {
499 downstream: downstream_client,
500 ..
501 } = &mut this.state
502 {
503 downstream_client.take();
504 } else {
505 unreachable!("unshared called on remote store");
506 }
507 })
508 }),
509 });
510 }
511 GitStoreState::Remote { .. } => {
512 debug_panic!("shared called on remote store");
513 }
514 }
515 }
516
517 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
518 match &mut self.state {
519 GitStoreState::Local {
520 downstream: downstream_client,
521 ..
522 } => {
523 downstream_client.take();
524 }
525 GitStoreState::Ssh {
526 downstream: downstream_client,
527 ..
528 } => {
529 downstream_client.take();
530 }
531 GitStoreState::Remote { .. } => {
532 debug_panic!("unshared called on remote store");
533 }
534 }
535 self.shared_diffs.clear();
536 }
537
538 pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) {
539 self.shared_diffs.remove(peer_id);
540 }
541
542 pub fn active_repository(&self) -> Option<Entity<Repository>> {
543 self.active_repo_id
544 .as_ref()
545 .map(|id| self.repositories[&id].clone())
546 }
547
548 pub fn open_unstaged_diff(
549 &mut self,
550 buffer: Entity<Buffer>,
551 cx: &mut Context<Self>,
552 ) -> Task<Result<Entity<BufferDiff>>> {
553 let buffer_id = buffer.read(cx).remote_id();
554 if let Some(diff_state) = self.diffs.get(&buffer_id) {
555 if let Some(unstaged_diff) = diff_state
556 .read(cx)
557 .unstaged_diff
558 .as_ref()
559 .and_then(|weak| weak.upgrade())
560 {
561 if let Some(task) =
562 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
563 {
564 return cx.background_executor().spawn(async move {
565 task.await;
566 Ok(unstaged_diff)
567 });
568 }
569 return Task::ready(Ok(unstaged_diff));
570 }
571 }
572
573 let Some((repo, repo_path)) =
574 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
575 else {
576 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
577 };
578
579 let task = self
580 .loading_diffs
581 .entry((buffer_id, DiffKind::Unstaged))
582 .or_insert_with(|| {
583 let staged_text = repo.update(cx, |repo, cx| {
584 repo.load_staged_text(buffer_id, repo_path, cx)
585 });
586 cx.spawn(async move |this, cx| {
587 Self::open_diff_internal(
588 this,
589 DiffKind::Unstaged,
590 staged_text.await.map(DiffBasesChange::SetIndex),
591 buffer,
592 cx,
593 )
594 .await
595 .map_err(Arc::new)
596 })
597 .shared()
598 })
599 .clone();
600
601 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
602 }
603
604 pub fn open_uncommitted_diff(
605 &mut self,
606 buffer: Entity<Buffer>,
607 cx: &mut Context<Self>,
608 ) -> Task<Result<Entity<BufferDiff>>> {
609 let buffer_id = buffer.read(cx).remote_id();
610
611 if let Some(diff_state) = self.diffs.get(&buffer_id) {
612 if let Some(uncommitted_diff) = diff_state
613 .read(cx)
614 .uncommitted_diff
615 .as_ref()
616 .and_then(|weak| weak.upgrade())
617 {
618 if let Some(task) =
619 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
620 {
621 return cx.background_executor().spawn(async move {
622 task.await;
623 Ok(uncommitted_diff)
624 });
625 }
626 return Task::ready(Ok(uncommitted_diff));
627 }
628 }
629
630 let Some((repo, repo_path)) =
631 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
632 else {
633 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
634 };
635
636 let task = self
637 .loading_diffs
638 .entry((buffer_id, DiffKind::Uncommitted))
639 .or_insert_with(|| {
640 let changes = repo.update(cx, |repo, cx| {
641 repo.load_committed_text(buffer_id, repo_path, cx)
642 });
643
644 cx.spawn(async move |this, cx| {
645 Self::open_diff_internal(this, DiffKind::Uncommitted, changes.await, buffer, cx)
646 .await
647 .map_err(Arc::new)
648 })
649 .shared()
650 })
651 .clone();
652
653 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
654 }
655
656 async fn open_diff_internal(
657 this: WeakEntity<Self>,
658 kind: DiffKind,
659 texts: Result<DiffBasesChange>,
660 buffer_entity: Entity<Buffer>,
661 cx: &mut AsyncApp,
662 ) -> Result<Entity<BufferDiff>> {
663 let diff_bases_change = match texts {
664 Err(e) => {
665 this.update(cx, |this, cx| {
666 let buffer = buffer_entity.read(cx);
667 let buffer_id = buffer.remote_id();
668 this.loading_diffs.remove(&(buffer_id, kind));
669 })?;
670 return Err(e);
671 }
672 Ok(change) => change,
673 };
674
675 this.update(cx, |this, cx| {
676 let buffer = buffer_entity.read(cx);
677 let buffer_id = buffer.remote_id();
678 let language = buffer.language().cloned();
679 let language_registry = buffer.language_registry();
680 let text_snapshot = buffer.text_snapshot();
681 this.loading_diffs.remove(&(buffer_id, kind));
682
683 let diff_state = this
684 .diffs
685 .entry(buffer_id)
686 .or_insert_with(|| cx.new(|_| BufferDiffState::default()));
687
688 let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
689
690 cx.subscribe(&diff, Self::on_buffer_diff_event).detach();
691 diff_state.update(cx, |diff_state, cx| {
692 diff_state.language = language;
693 diff_state.language_registry = language_registry;
694
695 match kind {
696 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
697 DiffKind::Uncommitted => {
698 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
699 diff
700 } else {
701 let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
702 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
703 unstaged_diff
704 };
705
706 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
707 diff_state.uncommitted_diff = Some(diff.downgrade())
708 }
709 }
710
711 diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
712 let rx = diff_state.wait_for_recalculation();
713
714 anyhow::Ok(async move {
715 if let Some(rx) = rx {
716 rx.await;
717 }
718 Ok(diff)
719 })
720 })
721 })??
722 .await
723 }
724
725 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
726 let diff_state = self.diffs.get(&buffer_id)?;
727 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
728 }
729
730 pub fn get_uncommitted_diff(
731 &self,
732 buffer_id: BufferId,
733 cx: &App,
734 ) -> Option<Entity<BufferDiff>> {
735 let diff_state = self.diffs.get(&buffer_id)?;
736 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
737 }
738
739 pub fn project_path_git_status(
740 &self,
741 project_path: &ProjectPath,
742 cx: &App,
743 ) -> Option<FileStatus> {
744 let (repo, repo_path) = self.repository_and_path_for_project_path(project_path, cx)?;
745 Some(repo.read(cx).status_for_path(&repo_path)?.status)
746 }
747
748 pub fn checkpoint(&self, cx: &mut App) -> Task<Result<GitStoreCheckpoint>> {
749 let mut work_directory_abs_paths = Vec::new();
750 let mut checkpoints = Vec::new();
751 for repository in self.repositories.values() {
752 repository.update(cx, |repository, _| {
753 work_directory_abs_paths.push(repository.snapshot.work_directory_abs_path.clone());
754 checkpoints.push(repository.checkpoint().map(|checkpoint| checkpoint?));
755 });
756 }
757
758 cx.background_executor().spawn(async move {
759 let checkpoints = future::try_join_all(checkpoints).await?;
760 Ok(GitStoreCheckpoint {
761 checkpoints_by_work_dir_abs_path: work_directory_abs_paths
762 .into_iter()
763 .zip(checkpoints)
764 .collect(),
765 })
766 })
767 }
768
769 pub fn restore_checkpoint(
770 &self,
771 checkpoint: GitStoreCheckpoint,
772 cx: &mut App,
773 ) -> Task<Result<()>> {
774 let repositories_by_work_dir_abs_path = self
775 .repositories
776 .values()
777 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
778 .collect::<HashMap<_, _>>();
779
780 let mut tasks = Vec::new();
781 for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
782 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path) {
783 let restore = repository.update(cx, |repository, _| {
784 repository.restore_checkpoint(checkpoint)
785 });
786 tasks.push(async move { restore.await? });
787 }
788 }
789 cx.background_spawn(async move {
790 future::try_join_all(tasks).await?;
791 Ok(())
792 })
793 }
794
795 /// Compares two checkpoints, returning true if they are equal.
796 pub fn compare_checkpoints(
797 &self,
798 left: GitStoreCheckpoint,
799 mut right: GitStoreCheckpoint,
800 cx: &mut App,
801 ) -> Task<Result<bool>> {
802 let repositories_by_work_dir_abs_path = self
803 .repositories
804 .values()
805 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
806 .collect::<HashMap<_, _>>();
807
808 let mut tasks = Vec::new();
809 for (work_dir_abs_path, left_checkpoint) in left.checkpoints_by_work_dir_abs_path {
810 if let Some(right_checkpoint) = right
811 .checkpoints_by_work_dir_abs_path
812 .remove(&work_dir_abs_path)
813 {
814 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path)
815 {
816 let compare = repository.update(cx, |repository, _| {
817 repository.compare_checkpoints(left_checkpoint, right_checkpoint)
818 });
819
820 tasks.push(async move { compare.await? });
821 }
822 } else {
823 return Task::ready(Ok(false));
824 }
825 }
826 cx.background_spawn(async move {
827 Ok(future::try_join_all(tasks)
828 .await?
829 .into_iter()
830 .all(|result| result))
831 })
832 }
833
834 pub fn delete_checkpoint(
835 &self,
836 checkpoint: GitStoreCheckpoint,
837 cx: &mut App,
838 ) -> Task<Result<()>> {
839 let repositories_by_work_directory_abs_path = self
840 .repositories
841 .values()
842 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
843 .collect::<HashMap<_, _>>();
844
845 let mut tasks = Vec::new();
846 for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
847 if let Some(repository) =
848 repositories_by_work_directory_abs_path.get(&work_dir_abs_path)
849 {
850 let delete = repository.update(cx, |this, _| this.delete_checkpoint(checkpoint));
851 tasks.push(async move { delete.await? });
852 }
853 }
854 cx.background_spawn(async move {
855 future::try_join_all(tasks).await?;
856 Ok(())
857 })
858 }
859
860 /// Blames a buffer.
861 pub fn blame_buffer(
862 &self,
863 buffer: &Entity<Buffer>,
864 version: Option<clock::Global>,
865 cx: &mut App,
866 ) -> Task<Result<Option<Blame>>> {
867 let buffer = buffer.read(cx);
868 let Some((repo, repo_path)) =
869 self.repository_and_path_for_buffer_id(buffer.remote_id(), cx)
870 else {
871 return Task::ready(Err(anyhow!("failed to find a git repository for buffer")));
872 };
873 let content = match &version {
874 Some(version) => buffer.rope_for_version(version).clone(),
875 None => buffer.as_rope().clone(),
876 };
877 let version = version.unwrap_or(buffer.version());
878 let buffer_id = buffer.remote_id();
879
880 let rx = repo.update(cx, |repo, _| {
881 repo.send_job(None, move |state, _| async move {
882 match state {
883 RepositoryState::Local { backend, .. } => backend
884 .blame(repo_path.clone(), content)
885 .await
886 .with_context(|| format!("Failed to blame {:?}", repo_path.0))
887 .map(Some),
888 RepositoryState::Remote { project_id, client } => {
889 let response = client
890 .request(proto::BlameBuffer {
891 project_id: project_id.to_proto(),
892 buffer_id: buffer_id.into(),
893 version: serialize_version(&version),
894 })
895 .await?;
896 Ok(deserialize_blame_buffer_response(response))
897 }
898 }
899 })
900 });
901
902 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
903 }
904
905 pub fn get_permalink_to_line(
906 &self,
907 buffer: &Entity<Buffer>,
908 selection: Range<u32>,
909 cx: &mut App,
910 ) -> Task<Result<url::Url>> {
911 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
912 return Task::ready(Err(anyhow!("buffer has no file")));
913 };
914
915 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
916 &(file.worktree.read(cx).id(), file.path.clone()).into(),
917 cx,
918 ) else {
919 // If we're not in a Git repo, check whether this is a Rust source
920 // file in the Cargo registry (presumably opened with go-to-definition
921 // from a normal Rust file). If so, we can put together a permalink
922 // using crate metadata.
923 if buffer
924 .read(cx)
925 .language()
926 .is_none_or(|lang| lang.name() != "Rust".into())
927 {
928 return Task::ready(Err(anyhow!("no permalink available")));
929 }
930 let Some(file_path) = file.worktree.read(cx).absolutize(&file.path).ok() else {
931 return Task::ready(Err(anyhow!("no permalink available")));
932 };
933 return cx.spawn(async move |cx| {
934 let provider_registry = cx.update(GitHostingProviderRegistry::default_global)?;
935 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
936 .map_err(|_| anyhow!("no permalink available"))
937 });
938
939 // TODO remote case
940 };
941
942 let buffer_id = buffer.read(cx).remote_id();
943 let branch = repo.read(cx).branch.clone();
944 let remote = branch
945 .as_ref()
946 .and_then(|b| b.upstream.as_ref())
947 .and_then(|b| b.remote_name())
948 .unwrap_or("origin")
949 .to_string();
950
951 let rx = repo.update(cx, |repo, _| {
952 repo.send_job(None, move |state, cx| async move {
953 match state {
954 RepositoryState::Local { backend, .. } => {
955 let origin_url = backend
956 .remote_url(&remote)
957 .ok_or_else(|| anyhow!("remote \"{remote}\" not found"))?;
958
959 let sha = backend
960 .head_sha()
961 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
962
963 let provider_registry =
964 cx.update(GitHostingProviderRegistry::default_global)?;
965
966 let (provider, remote) =
967 parse_git_remote_url(provider_registry, &origin_url)
968 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
969
970 let path = repo_path
971 .to_str()
972 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
973
974 Ok(provider.build_permalink(
975 remote,
976 BuildPermalinkParams {
977 sha: &sha,
978 path,
979 selection: Some(selection),
980 },
981 ))
982 }
983 RepositoryState::Remote { project_id, client } => {
984 let response = client
985 .request(proto::GetPermalinkToLine {
986 project_id: project_id.to_proto(),
987 buffer_id: buffer_id.into(),
988 selection: Some(proto::Range {
989 start: selection.start as u64,
990 end: selection.end as u64,
991 }),
992 })
993 .await?;
994
995 url::Url::parse(&response.permalink).context("failed to parse permalink")
996 }
997 }
998 })
999 });
1000 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1001 }
1002
1003 fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> {
1004 match &self.state {
1005 GitStoreState::Local {
1006 downstream: downstream_client,
1007 ..
1008 } => downstream_client
1009 .as_ref()
1010 .map(|state| (state.client.clone(), state.project_id)),
1011 GitStoreState::Ssh {
1012 downstream: downstream_client,
1013 ..
1014 } => downstream_client.clone(),
1015 GitStoreState::Remote { .. } => None,
1016 }
1017 }
1018
1019 fn upstream_client(&self) -> Option<AnyProtoClient> {
1020 match &self.state {
1021 GitStoreState::Local { .. } => None,
1022 GitStoreState::Ssh {
1023 upstream_client, ..
1024 }
1025 | GitStoreState::Remote {
1026 upstream_client, ..
1027 } => Some(upstream_client.clone()),
1028 }
1029 }
1030
1031 fn on_worktree_store_event(
1032 &mut self,
1033 worktree_store: Entity<WorktreeStore>,
1034 event: &WorktreeStoreEvent,
1035 cx: &mut Context<Self>,
1036 ) {
1037 let GitStoreState::Local {
1038 project_environment,
1039 downstream,
1040 next_repository_id,
1041 fs,
1042 } = &self.state
1043 else {
1044 return;
1045 };
1046
1047 match event {
1048 WorktreeStoreEvent::WorktreeUpdatedEntries(worktree_id, updated_entries) => {
1049 let mut paths_by_git_repo = HashMap::<_, Vec<_>>::default();
1050 for (relative_path, _, _) in updated_entries.iter() {
1051 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
1052 &(*worktree_id, relative_path.clone()).into(),
1053 cx,
1054 ) else {
1055 continue;
1056 };
1057 paths_by_git_repo.entry(repo).or_default().push(repo_path)
1058 }
1059
1060 for (repo, paths) in paths_by_git_repo {
1061 repo.update(cx, |repo, cx| {
1062 repo.paths_changed(
1063 paths,
1064 downstream
1065 .as_ref()
1066 .map(|downstream| downstream.updates_tx.clone()),
1067 cx,
1068 );
1069 });
1070 }
1071 }
1072 WorktreeStoreEvent::WorktreeUpdatedGitRepositories(worktree_id, changed_repos) => {
1073 let Some(worktree) = worktree_store.read(cx).worktree_for_id(*worktree_id, cx)
1074 else {
1075 return;
1076 };
1077 if !worktree.read(cx).is_visible() {
1078 log::debug!(
1079 "not adding repositories for local worktree {:?} because it's not visible",
1080 worktree.read(cx).abs_path()
1081 );
1082 return;
1083 }
1084 self.update_repositories_from_worktree(
1085 project_environment.clone(),
1086 next_repository_id.clone(),
1087 downstream
1088 .as_ref()
1089 .map(|downstream| downstream.updates_tx.clone()),
1090 changed_repos.clone(),
1091 fs.clone(),
1092 cx,
1093 );
1094 self.local_worktree_git_repos_changed(worktree, changed_repos, cx);
1095 }
1096 _ => {}
1097 }
1098 }
1099
1100 fn on_repository_event(
1101 &mut self,
1102 repo: Entity<Repository>,
1103 event: &RepositoryEvent,
1104 cx: &mut Context<Self>,
1105 ) {
1106 let id = repo.read(cx).id;
1107 cx.emit(GitStoreEvent::RepositoryUpdated(
1108 id,
1109 event.clone(),
1110 self.active_repo_id == Some(id),
1111 ))
1112 }
1113
1114 fn on_jobs_updated(&mut self, _: Entity<Repository>, _: &JobsUpdated, cx: &mut Context<Self>) {
1115 cx.emit(GitStoreEvent::JobsUpdated)
1116 }
1117
1118 /// Update our list of repositories and schedule git scans in response to a notification from a worktree,
1119 fn update_repositories_from_worktree(
1120 &mut self,
1121 project_environment: Entity<ProjectEnvironment>,
1122 next_repository_id: Arc<AtomicU64>,
1123 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
1124 updated_git_repositories: UpdatedGitRepositoriesSet,
1125 fs: Arc<dyn Fs>,
1126 cx: &mut Context<Self>,
1127 ) {
1128 let mut removed_ids = Vec::new();
1129 for update in updated_git_repositories.iter() {
1130 if let Some((id, existing)) = self.repositories.iter().find(|(_, repo)| {
1131 let existing_work_directory_abs_path =
1132 repo.read(cx).work_directory_abs_path.clone();
1133 Some(&existing_work_directory_abs_path)
1134 == update.old_work_directory_abs_path.as_ref()
1135 || Some(&existing_work_directory_abs_path)
1136 == update.new_work_directory_abs_path.as_ref()
1137 }) {
1138 if let Some(new_work_directory_abs_path) =
1139 update.new_work_directory_abs_path.clone()
1140 {
1141 existing.update(cx, |existing, cx| {
1142 existing.snapshot.work_directory_abs_path = new_work_directory_abs_path;
1143 existing.schedule_scan(updates_tx.clone(), cx);
1144 });
1145 } else {
1146 removed_ids.push(*id);
1147 }
1148 } else if let UpdatedGitRepository {
1149 new_work_directory_abs_path: Some(work_directory_abs_path),
1150 dot_git_abs_path: Some(dot_git_abs_path),
1151 repository_dir_abs_path: Some(repository_dir_abs_path),
1152 common_dir_abs_path: Some(common_dir_abs_path),
1153 ..
1154 } = update
1155 {
1156 let id = RepositoryId(next_repository_id.fetch_add(1, atomic::Ordering::Release));
1157 let git_store = cx.weak_entity();
1158 let repo = cx.new(|cx| {
1159 let mut repo = Repository::local(
1160 id,
1161 work_directory_abs_path.clone(),
1162 dot_git_abs_path.clone(),
1163 repository_dir_abs_path.clone(),
1164 common_dir_abs_path.clone(),
1165 project_environment.downgrade(),
1166 fs.clone(),
1167 git_store,
1168 cx,
1169 );
1170 repo.schedule_scan(updates_tx.clone(), cx);
1171 repo
1172 });
1173 self._subscriptions
1174 .push(cx.subscribe(&repo, Self::on_repository_event));
1175 self._subscriptions
1176 .push(cx.subscribe(&repo, Self::on_jobs_updated));
1177 self.repositories.insert(id, repo);
1178 cx.emit(GitStoreEvent::RepositoryAdded(id));
1179 self.active_repo_id.get_or_insert_with(|| {
1180 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1181 id
1182 });
1183 }
1184 }
1185
1186 for id in removed_ids {
1187 if self.active_repo_id == Some(id) {
1188 self.active_repo_id = None;
1189 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1190 }
1191 self.repositories.remove(&id);
1192 if let Some(updates_tx) = updates_tx.as_ref() {
1193 updates_tx
1194 .unbounded_send(DownstreamUpdate::RemoveRepository(id))
1195 .ok();
1196 }
1197 }
1198 }
1199
1200 fn on_buffer_store_event(
1201 &mut self,
1202 _: Entity<BufferStore>,
1203 event: &BufferStoreEvent,
1204 cx: &mut Context<Self>,
1205 ) {
1206 match event {
1207 BufferStoreEvent::BufferAdded(buffer) => {
1208 cx.subscribe(&buffer, |this, buffer, event, cx| {
1209 if let BufferEvent::LanguageChanged = event {
1210 let buffer_id = buffer.read(cx).remote_id();
1211 if let Some(diff_state) = this.diffs.get(&buffer_id) {
1212 diff_state.update(cx, |diff_state, cx| {
1213 diff_state.buffer_language_changed(buffer, cx);
1214 });
1215 }
1216 }
1217 })
1218 .detach();
1219 }
1220 BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => {
1221 if let Some(diffs) = self.shared_diffs.get_mut(peer_id) {
1222 diffs.remove(buffer_id);
1223 }
1224 }
1225 BufferStoreEvent::BufferDropped(buffer_id) => {
1226 self.diffs.remove(&buffer_id);
1227 for diffs in self.shared_diffs.values_mut() {
1228 diffs.remove(buffer_id);
1229 }
1230 }
1231
1232 _ => {}
1233 }
1234 }
1235
1236 pub fn recalculate_buffer_diffs(
1237 &mut self,
1238 buffers: Vec<Entity<Buffer>>,
1239 cx: &mut Context<Self>,
1240 ) -> impl Future<Output = ()> + use<> {
1241 let mut futures = Vec::new();
1242 for buffer in buffers {
1243 if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
1244 let buffer = buffer.read(cx).text_snapshot();
1245 diff_state.update(cx, |diff_state, cx| {
1246 diff_state.recalculate_diffs(buffer, cx);
1247 futures.extend(diff_state.wait_for_recalculation());
1248 });
1249 }
1250 }
1251 async move {
1252 futures::future::join_all(futures).await;
1253 }
1254 }
1255
1256 fn on_buffer_diff_event(
1257 &mut self,
1258 diff: Entity<buffer_diff::BufferDiff>,
1259 event: &BufferDiffEvent,
1260 cx: &mut Context<Self>,
1261 ) {
1262 if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
1263 let buffer_id = diff.read(cx).buffer_id;
1264 if let Some(diff_state) = self.diffs.get(&buffer_id) {
1265 let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
1266 diff_state.hunk_staging_operation_count += 1;
1267 diff_state.hunk_staging_operation_count
1268 });
1269 if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
1270 let recv = repo.update(cx, |repo, cx| {
1271 log::debug!("hunks changed for {}", path.display());
1272 repo.spawn_set_index_text_job(
1273 path,
1274 new_index_text.as_ref().map(|rope| rope.to_string()),
1275 Some(hunk_staging_operation_count),
1276 cx,
1277 )
1278 });
1279 let diff = diff.downgrade();
1280 cx.spawn(async move |this, cx| {
1281 if let Ok(Err(error)) = cx.background_spawn(recv).await {
1282 diff.update(cx, |diff, cx| {
1283 diff.clear_pending_hunks(cx);
1284 })
1285 .ok();
1286 this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
1287 .ok();
1288 }
1289 })
1290 .detach();
1291 }
1292 }
1293 }
1294 }
1295
1296 fn local_worktree_git_repos_changed(
1297 &mut self,
1298 worktree: Entity<Worktree>,
1299 changed_repos: &UpdatedGitRepositoriesSet,
1300 cx: &mut Context<Self>,
1301 ) {
1302 log::debug!("local worktree repos changed");
1303 debug_assert!(worktree.read(cx).is_local());
1304
1305 for repository in self.repositories.values() {
1306 repository.update(cx, |repository, cx| {
1307 let repo_abs_path = &repository.work_directory_abs_path;
1308 if changed_repos.iter().any(|update| {
1309 update.old_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1310 || update.new_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1311 }) {
1312 repository.reload_buffer_diff_bases(cx);
1313 }
1314 });
1315 }
1316 }
1317
1318 pub fn repositories(&self) -> &HashMap<RepositoryId, Entity<Repository>> {
1319 &self.repositories
1320 }
1321
1322 pub fn status_for_buffer_id(&self, buffer_id: BufferId, cx: &App) -> Option<FileStatus> {
1323 let (repo, path) = self.repository_and_path_for_buffer_id(buffer_id, cx)?;
1324 let status = repo.read(cx).snapshot.status_for_path(&path)?;
1325 Some(status.status)
1326 }
1327
1328 pub fn repository_and_path_for_buffer_id(
1329 &self,
1330 buffer_id: BufferId,
1331 cx: &App,
1332 ) -> Option<(Entity<Repository>, RepoPath)> {
1333 let buffer = self.buffer_store.read(cx).get(buffer_id)?;
1334 let project_path = buffer.read(cx).project_path(cx)?;
1335 self.repository_and_path_for_project_path(&project_path, cx)
1336 }
1337
1338 pub fn repository_and_path_for_project_path(
1339 &self,
1340 path: &ProjectPath,
1341 cx: &App,
1342 ) -> Option<(Entity<Repository>, RepoPath)> {
1343 let abs_path = self.worktree_store.read(cx).absolutize(path, cx)?;
1344 self.repositories
1345 .values()
1346 .filter_map(|repo| {
1347 let repo_path = repo.read(cx).abs_path_to_repo_path(&abs_path)?;
1348 Some((repo.clone(), repo_path))
1349 })
1350 .max_by_key(|(repo, _)| repo.read(cx).work_directory_abs_path.clone())
1351 }
1352
1353 pub fn git_init(
1354 &self,
1355 path: Arc<Path>,
1356 fallback_branch_name: String,
1357 cx: &App,
1358 ) -> Task<Result<()>> {
1359 match &self.state {
1360 GitStoreState::Local { fs, .. } => {
1361 let fs = fs.clone();
1362 cx.background_executor()
1363 .spawn(async move { fs.git_init(&path, fallback_branch_name) })
1364 }
1365 GitStoreState::Ssh {
1366 upstream_client,
1367 upstream_project_id: project_id,
1368 ..
1369 }
1370 | GitStoreState::Remote {
1371 upstream_client,
1372 upstream_project_id: project_id,
1373 ..
1374 } => {
1375 let client = upstream_client.clone();
1376 let project_id = *project_id;
1377 cx.background_executor().spawn(async move {
1378 client
1379 .request(proto::GitInit {
1380 project_id: project_id.0,
1381 abs_path: path.to_string_lossy().to_string(),
1382 fallback_branch_name,
1383 })
1384 .await?;
1385 Ok(())
1386 })
1387 }
1388 }
1389 }
1390
1391 async fn handle_update_repository(
1392 this: Entity<Self>,
1393 envelope: TypedEnvelope<proto::UpdateRepository>,
1394 mut cx: AsyncApp,
1395 ) -> Result<()> {
1396 this.update(&mut cx, |this, cx| {
1397 let mut update = envelope.payload;
1398
1399 let id = RepositoryId::from_proto(update.id);
1400 let client = this
1401 .upstream_client()
1402 .context("no upstream client")?
1403 .clone();
1404
1405 let mut is_new = false;
1406 let repo = this.repositories.entry(id).or_insert_with(|| {
1407 is_new = true;
1408 let git_store = cx.weak_entity();
1409 cx.new(|cx| {
1410 Repository::remote(
1411 id,
1412 Path::new(&update.abs_path).into(),
1413 ProjectId(update.project_id),
1414 client,
1415 git_store,
1416 cx,
1417 )
1418 })
1419 });
1420 if is_new {
1421 this._subscriptions
1422 .push(cx.subscribe(&repo, Self::on_repository_event))
1423 }
1424
1425 repo.update(cx, {
1426 let update = update.clone();
1427 |repo, cx| repo.apply_remote_update(update, cx)
1428 })?;
1429
1430 this.active_repo_id.get_or_insert_with(|| {
1431 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1432 id
1433 });
1434
1435 if let Some((client, project_id)) = this.downstream_client() {
1436 update.project_id = project_id.to_proto();
1437 client.send(update).log_err();
1438 }
1439 Ok(())
1440 })?
1441 }
1442
1443 async fn handle_remove_repository(
1444 this: Entity<Self>,
1445 envelope: TypedEnvelope<proto::RemoveRepository>,
1446 mut cx: AsyncApp,
1447 ) -> Result<()> {
1448 this.update(&mut cx, |this, cx| {
1449 let mut update = envelope.payload;
1450 let id = RepositoryId::from_proto(update.id);
1451 this.repositories.remove(&id);
1452 if let Some((client, project_id)) = this.downstream_client() {
1453 update.project_id = project_id.to_proto();
1454 client.send(update).log_err();
1455 }
1456 if this.active_repo_id == Some(id) {
1457 this.active_repo_id = None;
1458 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1459 }
1460 cx.emit(GitStoreEvent::RepositoryRemoved(id));
1461 })
1462 }
1463
1464 async fn handle_git_init(
1465 this: Entity<Self>,
1466 envelope: TypedEnvelope<proto::GitInit>,
1467 cx: AsyncApp,
1468 ) -> Result<proto::Ack> {
1469 let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1470 let name = envelope.payload.fallback_branch_name;
1471 cx.update(|cx| this.read(cx).git_init(path, name, cx))?
1472 .await?;
1473
1474 Ok(proto::Ack {})
1475 }
1476
1477 async fn handle_fetch(
1478 this: Entity<Self>,
1479 envelope: TypedEnvelope<proto::Fetch>,
1480 mut cx: AsyncApp,
1481 ) -> Result<proto::RemoteMessageResponse> {
1482 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1483 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1484 let askpass_id = envelope.payload.askpass_id;
1485
1486 let askpass = make_remote_delegate(
1487 this,
1488 envelope.payload.project_id,
1489 repository_id,
1490 askpass_id,
1491 &mut cx,
1492 );
1493
1494 let remote_output = repository_handle
1495 .update(&mut cx, |repository_handle, cx| {
1496 repository_handle.fetch(askpass, cx)
1497 })?
1498 .await??;
1499
1500 Ok(proto::RemoteMessageResponse {
1501 stdout: remote_output.stdout,
1502 stderr: remote_output.stderr,
1503 })
1504 }
1505
1506 async fn handle_push(
1507 this: Entity<Self>,
1508 envelope: TypedEnvelope<proto::Push>,
1509 mut cx: AsyncApp,
1510 ) -> Result<proto::RemoteMessageResponse> {
1511 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1512 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1513
1514 let askpass_id = envelope.payload.askpass_id;
1515 let askpass = make_remote_delegate(
1516 this,
1517 envelope.payload.project_id,
1518 repository_id,
1519 askpass_id,
1520 &mut cx,
1521 );
1522
1523 let options = envelope
1524 .payload
1525 .options
1526 .as_ref()
1527 .map(|_| match envelope.payload.options() {
1528 proto::push::PushOptions::SetUpstream => git::repository::PushOptions::SetUpstream,
1529 proto::push::PushOptions::Force => git::repository::PushOptions::Force,
1530 });
1531
1532 let branch_name = envelope.payload.branch_name.into();
1533 let remote_name = envelope.payload.remote_name.into();
1534
1535 let remote_output = repository_handle
1536 .update(&mut cx, |repository_handle, cx| {
1537 repository_handle.push(branch_name, remote_name, options, askpass, cx)
1538 })?
1539 .await??;
1540 Ok(proto::RemoteMessageResponse {
1541 stdout: remote_output.stdout,
1542 stderr: remote_output.stderr,
1543 })
1544 }
1545
1546 async fn handle_pull(
1547 this: Entity<Self>,
1548 envelope: TypedEnvelope<proto::Pull>,
1549 mut cx: AsyncApp,
1550 ) -> Result<proto::RemoteMessageResponse> {
1551 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1552 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1553 let askpass_id = envelope.payload.askpass_id;
1554 let askpass = make_remote_delegate(
1555 this,
1556 envelope.payload.project_id,
1557 repository_id,
1558 askpass_id,
1559 &mut cx,
1560 );
1561
1562 let branch_name = envelope.payload.branch_name.into();
1563 let remote_name = envelope.payload.remote_name.into();
1564
1565 let remote_message = repository_handle
1566 .update(&mut cx, |repository_handle, cx| {
1567 repository_handle.pull(branch_name, remote_name, askpass, cx)
1568 })?
1569 .await??;
1570
1571 Ok(proto::RemoteMessageResponse {
1572 stdout: remote_message.stdout,
1573 stderr: remote_message.stderr,
1574 })
1575 }
1576
1577 async fn handle_stage(
1578 this: Entity<Self>,
1579 envelope: TypedEnvelope<proto::Stage>,
1580 mut cx: AsyncApp,
1581 ) -> Result<proto::Ack> {
1582 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1583 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1584
1585 let entries = envelope
1586 .payload
1587 .paths
1588 .into_iter()
1589 .map(PathBuf::from)
1590 .map(RepoPath::new)
1591 .collect();
1592
1593 repository_handle
1594 .update(&mut cx, |repository_handle, cx| {
1595 repository_handle.stage_entries(entries, cx)
1596 })?
1597 .await?;
1598 Ok(proto::Ack {})
1599 }
1600
1601 async fn handle_unstage(
1602 this: Entity<Self>,
1603 envelope: TypedEnvelope<proto::Unstage>,
1604 mut cx: AsyncApp,
1605 ) -> Result<proto::Ack> {
1606 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1607 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1608
1609 let entries = envelope
1610 .payload
1611 .paths
1612 .into_iter()
1613 .map(PathBuf::from)
1614 .map(RepoPath::new)
1615 .collect();
1616
1617 repository_handle
1618 .update(&mut cx, |repository_handle, cx| {
1619 repository_handle.unstage_entries(entries, cx)
1620 })?
1621 .await?;
1622
1623 Ok(proto::Ack {})
1624 }
1625
1626 async fn handle_set_index_text(
1627 this: Entity<Self>,
1628 envelope: TypedEnvelope<proto::SetIndexText>,
1629 mut cx: AsyncApp,
1630 ) -> Result<proto::Ack> {
1631 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1632 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1633 let repo_path = RepoPath::from_str(&envelope.payload.path);
1634
1635 repository_handle
1636 .update(&mut cx, |repository_handle, cx| {
1637 repository_handle.spawn_set_index_text_job(
1638 repo_path,
1639 envelope.payload.text,
1640 None,
1641 cx,
1642 )
1643 })?
1644 .await??;
1645 Ok(proto::Ack {})
1646 }
1647
1648 async fn handle_commit(
1649 this: Entity<Self>,
1650 envelope: TypedEnvelope<proto::Commit>,
1651 mut cx: AsyncApp,
1652 ) -> Result<proto::Ack> {
1653 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1654 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1655
1656 let message = SharedString::from(envelope.payload.message);
1657 let name = envelope.payload.name.map(SharedString::from);
1658 let email = envelope.payload.email.map(SharedString::from);
1659 let options = envelope.payload.options.unwrap_or_default();
1660
1661 repository_handle
1662 .update(&mut cx, |repository_handle, cx| {
1663 repository_handle.commit(
1664 message,
1665 name.zip(email),
1666 CommitOptions {
1667 amend: options.amend,
1668 },
1669 cx,
1670 )
1671 })?
1672 .await??;
1673 Ok(proto::Ack {})
1674 }
1675
1676 async fn handle_get_remotes(
1677 this: Entity<Self>,
1678 envelope: TypedEnvelope<proto::GetRemotes>,
1679 mut cx: AsyncApp,
1680 ) -> Result<proto::GetRemotesResponse> {
1681 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1682 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1683
1684 let branch_name = envelope.payload.branch_name;
1685
1686 let remotes = repository_handle
1687 .update(&mut cx, |repository_handle, _| {
1688 repository_handle.get_remotes(branch_name)
1689 })?
1690 .await??;
1691
1692 Ok(proto::GetRemotesResponse {
1693 remotes: remotes
1694 .into_iter()
1695 .map(|remotes| proto::get_remotes_response::Remote {
1696 name: remotes.name.to_string(),
1697 })
1698 .collect::<Vec<_>>(),
1699 })
1700 }
1701
1702 async fn handle_get_branches(
1703 this: Entity<Self>,
1704 envelope: TypedEnvelope<proto::GitGetBranches>,
1705 mut cx: AsyncApp,
1706 ) -> Result<proto::GitBranchesResponse> {
1707 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1708 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1709
1710 let branches = repository_handle
1711 .update(&mut cx, |repository_handle, _| repository_handle.branches())?
1712 .await??;
1713
1714 Ok(proto::GitBranchesResponse {
1715 branches: branches
1716 .into_iter()
1717 .map(|branch| branch_to_proto(&branch))
1718 .collect::<Vec<_>>(),
1719 })
1720 }
1721 async fn handle_create_branch(
1722 this: Entity<Self>,
1723 envelope: TypedEnvelope<proto::GitCreateBranch>,
1724 mut cx: AsyncApp,
1725 ) -> Result<proto::Ack> {
1726 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1727 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1728 let branch_name = envelope.payload.branch_name;
1729
1730 repository_handle
1731 .update(&mut cx, |repository_handle, _| {
1732 repository_handle.create_branch(branch_name)
1733 })?
1734 .await??;
1735
1736 Ok(proto::Ack {})
1737 }
1738
1739 async fn handle_change_branch(
1740 this: Entity<Self>,
1741 envelope: TypedEnvelope<proto::GitChangeBranch>,
1742 mut cx: AsyncApp,
1743 ) -> Result<proto::Ack> {
1744 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1745 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1746 let branch_name = envelope.payload.branch_name;
1747
1748 repository_handle
1749 .update(&mut cx, |repository_handle, _| {
1750 repository_handle.change_branch(branch_name)
1751 })?
1752 .await??;
1753
1754 Ok(proto::Ack {})
1755 }
1756
1757 async fn handle_show(
1758 this: Entity<Self>,
1759 envelope: TypedEnvelope<proto::GitShow>,
1760 mut cx: AsyncApp,
1761 ) -> Result<proto::GitCommitDetails> {
1762 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1763 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1764
1765 let commit = repository_handle
1766 .update(&mut cx, |repository_handle, _| {
1767 repository_handle.show(envelope.payload.commit)
1768 })?
1769 .await??;
1770 Ok(proto::GitCommitDetails {
1771 sha: commit.sha.into(),
1772 message: commit.message.into(),
1773 commit_timestamp: commit.commit_timestamp,
1774 author_email: commit.author_email.into(),
1775 author_name: commit.author_name.into(),
1776 })
1777 }
1778
1779 async fn handle_load_commit_diff(
1780 this: Entity<Self>,
1781 envelope: TypedEnvelope<proto::LoadCommitDiff>,
1782 mut cx: AsyncApp,
1783 ) -> Result<proto::LoadCommitDiffResponse> {
1784 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1785 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1786
1787 let commit_diff = repository_handle
1788 .update(&mut cx, |repository_handle, _| {
1789 repository_handle.load_commit_diff(envelope.payload.commit)
1790 })?
1791 .await??;
1792 Ok(proto::LoadCommitDiffResponse {
1793 files: commit_diff
1794 .files
1795 .into_iter()
1796 .map(|file| proto::CommitFile {
1797 path: file.path.to_string(),
1798 old_text: file.old_text,
1799 new_text: file.new_text,
1800 })
1801 .collect(),
1802 })
1803 }
1804
1805 async fn handle_reset(
1806 this: Entity<Self>,
1807 envelope: TypedEnvelope<proto::GitReset>,
1808 mut cx: AsyncApp,
1809 ) -> Result<proto::Ack> {
1810 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1811 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1812
1813 let mode = match envelope.payload.mode() {
1814 git_reset::ResetMode::Soft => ResetMode::Soft,
1815 git_reset::ResetMode::Mixed => ResetMode::Mixed,
1816 };
1817
1818 repository_handle
1819 .update(&mut cx, |repository_handle, cx| {
1820 repository_handle.reset(envelope.payload.commit, mode, cx)
1821 })?
1822 .await??;
1823 Ok(proto::Ack {})
1824 }
1825
1826 async fn handle_checkout_files(
1827 this: Entity<Self>,
1828 envelope: TypedEnvelope<proto::GitCheckoutFiles>,
1829 mut cx: AsyncApp,
1830 ) -> Result<proto::Ack> {
1831 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1832 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1833 let paths = envelope
1834 .payload
1835 .paths
1836 .iter()
1837 .map(|s| RepoPath::from_str(s))
1838 .collect();
1839
1840 repository_handle
1841 .update(&mut cx, |repository_handle, cx| {
1842 repository_handle.checkout_files(&envelope.payload.commit, paths, cx)
1843 })?
1844 .await??;
1845 Ok(proto::Ack {})
1846 }
1847
1848 async fn handle_open_commit_message_buffer(
1849 this: Entity<Self>,
1850 envelope: TypedEnvelope<proto::OpenCommitMessageBuffer>,
1851 mut cx: AsyncApp,
1852 ) -> Result<proto::OpenBufferResponse> {
1853 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1854 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1855 let buffer = repository
1856 .update(&mut cx, |repository, cx| {
1857 repository.open_commit_buffer(None, this.read(cx).buffer_store.clone(), cx)
1858 })?
1859 .await?;
1860
1861 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id())?;
1862 this.update(&mut cx, |this, cx| {
1863 this.buffer_store.update(cx, |buffer_store, cx| {
1864 buffer_store
1865 .create_buffer_for_peer(
1866 &buffer,
1867 envelope.original_sender_id.unwrap_or(envelope.sender_id),
1868 cx,
1869 )
1870 .detach_and_log_err(cx);
1871 })
1872 })?;
1873
1874 Ok(proto::OpenBufferResponse {
1875 buffer_id: buffer_id.to_proto(),
1876 })
1877 }
1878
1879 async fn handle_askpass(
1880 this: Entity<Self>,
1881 envelope: TypedEnvelope<proto::AskPassRequest>,
1882 mut cx: AsyncApp,
1883 ) -> Result<proto::AskPassResponse> {
1884 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1885 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1886
1887 let delegates = cx.update(|cx| repository.read(cx).askpass_delegates.clone())?;
1888 let Some(mut askpass) = delegates.lock().remove(&envelope.payload.askpass_id) else {
1889 debug_panic!("no askpass found");
1890 return Err(anyhow::anyhow!("no askpass found"));
1891 };
1892
1893 let response = askpass.ask_password(envelope.payload.prompt).await?;
1894
1895 delegates
1896 .lock()
1897 .insert(envelope.payload.askpass_id, askpass);
1898
1899 Ok(proto::AskPassResponse { response })
1900 }
1901
1902 async fn handle_check_for_pushed_commits(
1903 this: Entity<Self>,
1904 envelope: TypedEnvelope<proto::CheckForPushedCommits>,
1905 mut cx: AsyncApp,
1906 ) -> Result<proto::CheckForPushedCommitsResponse> {
1907 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1908 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1909
1910 let branches = repository_handle
1911 .update(&mut cx, |repository_handle, _| {
1912 repository_handle.check_for_pushed_commits()
1913 })?
1914 .await??;
1915 Ok(proto::CheckForPushedCommitsResponse {
1916 pushed_to: branches
1917 .into_iter()
1918 .map(|commit| commit.to_string())
1919 .collect(),
1920 })
1921 }
1922
1923 async fn handle_git_diff(
1924 this: Entity<Self>,
1925 envelope: TypedEnvelope<proto::GitDiff>,
1926 mut cx: AsyncApp,
1927 ) -> Result<proto::GitDiffResponse> {
1928 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1929 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1930 let diff_type = match envelope.payload.diff_type() {
1931 proto::git_diff::DiffType::HeadToIndex => DiffType::HeadToIndex,
1932 proto::git_diff::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
1933 };
1934
1935 let mut diff = repository_handle
1936 .update(&mut cx, |repository_handle, cx| {
1937 repository_handle.diff(diff_type, cx)
1938 })?
1939 .await??;
1940 const ONE_MB: usize = 1_000_000;
1941 if diff.len() > ONE_MB {
1942 diff = diff.chars().take(ONE_MB).collect()
1943 }
1944
1945 Ok(proto::GitDiffResponse { diff })
1946 }
1947
1948 async fn handle_open_unstaged_diff(
1949 this: Entity<Self>,
1950 request: TypedEnvelope<proto::OpenUnstagedDiff>,
1951 mut cx: AsyncApp,
1952 ) -> Result<proto::OpenUnstagedDiffResponse> {
1953 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1954 let diff = this
1955 .update(&mut cx, |this, cx| {
1956 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
1957 Some(this.open_unstaged_diff(buffer, cx))
1958 })?
1959 .ok_or_else(|| anyhow!("no such buffer"))?
1960 .await?;
1961 this.update(&mut cx, |this, _| {
1962 let shared_diffs = this
1963 .shared_diffs
1964 .entry(request.original_sender_id.unwrap_or(request.sender_id))
1965 .or_default();
1966 shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone());
1967 })?;
1968 let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?;
1969 Ok(proto::OpenUnstagedDiffResponse { staged_text })
1970 }
1971
1972 async fn handle_open_uncommitted_diff(
1973 this: Entity<Self>,
1974 request: TypedEnvelope<proto::OpenUncommittedDiff>,
1975 mut cx: AsyncApp,
1976 ) -> Result<proto::OpenUncommittedDiffResponse> {
1977 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1978 let diff = this
1979 .update(&mut cx, |this, cx| {
1980 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
1981 Some(this.open_uncommitted_diff(buffer, cx))
1982 })?
1983 .ok_or_else(|| anyhow!("no such buffer"))?
1984 .await?;
1985 this.update(&mut cx, |this, _| {
1986 let shared_diffs = this
1987 .shared_diffs
1988 .entry(request.original_sender_id.unwrap_or(request.sender_id))
1989 .or_default();
1990 shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone());
1991 })?;
1992 diff.read_with(&cx, |diff, cx| {
1993 use proto::open_uncommitted_diff_response::Mode;
1994
1995 let unstaged_diff = diff.secondary_diff();
1996 let index_snapshot = unstaged_diff.and_then(|diff| {
1997 let diff = diff.read(cx);
1998 diff.base_text_exists().then(|| diff.base_text())
1999 });
2000
2001 let mode;
2002 let staged_text;
2003 let committed_text;
2004 if diff.base_text_exists() {
2005 let committed_snapshot = diff.base_text();
2006 committed_text = Some(committed_snapshot.text());
2007 if let Some(index_text) = index_snapshot {
2008 if index_text.remote_id() == committed_snapshot.remote_id() {
2009 mode = Mode::IndexMatchesHead;
2010 staged_text = None;
2011 } else {
2012 mode = Mode::IndexAndHead;
2013 staged_text = Some(index_text.text());
2014 }
2015 } else {
2016 mode = Mode::IndexAndHead;
2017 staged_text = None;
2018 }
2019 } else {
2020 mode = Mode::IndexAndHead;
2021 committed_text = None;
2022 staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2023 }
2024
2025 proto::OpenUncommittedDiffResponse {
2026 committed_text,
2027 staged_text,
2028 mode: mode.into(),
2029 }
2030 })
2031 }
2032
2033 async fn handle_update_diff_bases(
2034 this: Entity<Self>,
2035 request: TypedEnvelope<proto::UpdateDiffBases>,
2036 mut cx: AsyncApp,
2037 ) -> Result<()> {
2038 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2039 this.update(&mut cx, |this, cx| {
2040 if let Some(diff_state) = this.diffs.get_mut(&buffer_id) {
2041 if let Some(buffer) = this.buffer_store.read(cx).get(buffer_id) {
2042 let buffer = buffer.read(cx).text_snapshot();
2043 diff_state.update(cx, |diff_state, cx| {
2044 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2045 })
2046 }
2047 }
2048 })
2049 }
2050
2051 async fn handle_blame_buffer(
2052 this: Entity<Self>,
2053 envelope: TypedEnvelope<proto::BlameBuffer>,
2054 mut cx: AsyncApp,
2055 ) -> Result<proto::BlameBufferResponse> {
2056 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2057 let version = deserialize_version(&envelope.payload.version);
2058 let buffer = this.read_with(&cx, |this, cx| {
2059 this.buffer_store.read(cx).get_existing(buffer_id)
2060 })??;
2061 buffer
2062 .update(&mut cx, |buffer, _| {
2063 buffer.wait_for_version(version.clone())
2064 })?
2065 .await?;
2066 let blame = this
2067 .update(&mut cx, |this, cx| {
2068 this.blame_buffer(&buffer, Some(version), cx)
2069 })?
2070 .await?;
2071 Ok(serialize_blame_buffer_response(blame))
2072 }
2073
2074 async fn handle_get_permalink_to_line(
2075 this: Entity<Self>,
2076 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2077 mut cx: AsyncApp,
2078 ) -> Result<proto::GetPermalinkToLineResponse> {
2079 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2080 // let version = deserialize_version(&envelope.payload.version);
2081 let selection = {
2082 let proto_selection = envelope
2083 .payload
2084 .selection
2085 .context("no selection to get permalink for defined")?;
2086 proto_selection.start as u32..proto_selection.end as u32
2087 };
2088 let buffer = this.read_with(&cx, |this, cx| {
2089 this.buffer_store.read(cx).get_existing(buffer_id)
2090 })??;
2091 let permalink = this
2092 .update(&mut cx, |this, cx| {
2093 this.get_permalink_to_line(&buffer, selection, cx)
2094 })?
2095 .await?;
2096 Ok(proto::GetPermalinkToLineResponse {
2097 permalink: permalink.to_string(),
2098 })
2099 }
2100
2101 fn repository_for_request(
2102 this: &Entity<Self>,
2103 id: RepositoryId,
2104 cx: &mut AsyncApp,
2105 ) -> Result<Entity<Repository>> {
2106 this.update(cx, |this, _| {
2107 this.repositories
2108 .get(&id)
2109 .context("missing repository handle")
2110 .cloned()
2111 })?
2112 }
2113
2114 pub fn repo_snapshots(&self, cx: &App) -> HashMap<RepositoryId, RepositorySnapshot> {
2115 self.repositories
2116 .iter()
2117 .map(|(id, repo)| (*id, repo.read(cx).snapshot.clone()))
2118 .collect()
2119 }
2120}
2121
2122impl BufferDiffState {
2123 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
2124 self.language = buffer.read(cx).language().cloned();
2125 self.language_changed = true;
2126 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
2127 }
2128
2129 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
2130 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
2131 }
2132
2133 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
2134 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
2135 }
2136
2137 fn handle_base_texts_updated(
2138 &mut self,
2139 buffer: text::BufferSnapshot,
2140 message: proto::UpdateDiffBases,
2141 cx: &mut Context<Self>,
2142 ) {
2143 use proto::update_diff_bases::Mode;
2144
2145 let Some(mode) = Mode::from_i32(message.mode) else {
2146 return;
2147 };
2148
2149 let diff_bases_change = match mode {
2150 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
2151 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
2152 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
2153 Mode::IndexAndHead => DiffBasesChange::SetEach {
2154 index: message.staged_text,
2155 head: message.committed_text,
2156 },
2157 };
2158
2159 self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
2160 }
2161
2162 pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
2163 if *self.recalculating_tx.borrow() {
2164 let mut rx = self.recalculating_tx.subscribe();
2165 return Some(async move {
2166 loop {
2167 let is_recalculating = rx.recv().await;
2168 if is_recalculating != Some(true) {
2169 break;
2170 }
2171 }
2172 });
2173 } else {
2174 None
2175 }
2176 }
2177
2178 fn diff_bases_changed(
2179 &mut self,
2180 buffer: text::BufferSnapshot,
2181 diff_bases_change: Option<DiffBasesChange>,
2182 cx: &mut Context<Self>,
2183 ) {
2184 match diff_bases_change {
2185 Some(DiffBasesChange::SetIndex(index)) => {
2186 self.index_text = index.map(|mut index| {
2187 text::LineEnding::normalize(&mut index);
2188 Arc::new(index)
2189 });
2190 self.index_changed = true;
2191 }
2192 Some(DiffBasesChange::SetHead(head)) => {
2193 self.head_text = head.map(|mut head| {
2194 text::LineEnding::normalize(&mut head);
2195 Arc::new(head)
2196 });
2197 self.head_changed = true;
2198 }
2199 Some(DiffBasesChange::SetBoth(text)) => {
2200 let text = text.map(|mut text| {
2201 text::LineEnding::normalize(&mut text);
2202 Arc::new(text)
2203 });
2204 self.head_text = text.clone();
2205 self.index_text = text;
2206 self.head_changed = true;
2207 self.index_changed = true;
2208 }
2209 Some(DiffBasesChange::SetEach { index, head }) => {
2210 self.index_text = index.map(|mut index| {
2211 text::LineEnding::normalize(&mut index);
2212 Arc::new(index)
2213 });
2214 self.index_changed = true;
2215 self.head_text = head.map(|mut head| {
2216 text::LineEnding::normalize(&mut head);
2217 Arc::new(head)
2218 });
2219 self.head_changed = true;
2220 }
2221 None => {}
2222 }
2223
2224 self.recalculate_diffs(buffer, cx)
2225 }
2226
2227 fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
2228 *self.recalculating_tx.borrow_mut() = true;
2229
2230 let language = self.language.clone();
2231 let language_registry = self.language_registry.clone();
2232 let unstaged_diff = self.unstaged_diff();
2233 let uncommitted_diff = self.uncommitted_diff();
2234 let head = self.head_text.clone();
2235 let index = self.index_text.clone();
2236 let index_changed = self.index_changed;
2237 let head_changed = self.head_changed;
2238 let language_changed = self.language_changed;
2239 let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_write;
2240 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
2241 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
2242 (None, None) => true,
2243 _ => false,
2244 };
2245 self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
2246 log::debug!(
2247 "start recalculating diffs for buffer {}",
2248 buffer.remote_id()
2249 );
2250
2251 let mut new_unstaged_diff = None;
2252 if let Some(unstaged_diff) = &unstaged_diff {
2253 new_unstaged_diff = Some(
2254 BufferDiff::update_diff(
2255 unstaged_diff.clone(),
2256 buffer.clone(),
2257 index,
2258 index_changed,
2259 language_changed,
2260 language.clone(),
2261 language_registry.clone(),
2262 cx,
2263 )
2264 .await?,
2265 );
2266 }
2267
2268 let mut new_uncommitted_diff = None;
2269 if let Some(uncommitted_diff) = &uncommitted_diff {
2270 new_uncommitted_diff = if index_matches_head {
2271 new_unstaged_diff.clone()
2272 } else {
2273 Some(
2274 BufferDiff::update_diff(
2275 uncommitted_diff.clone(),
2276 buffer.clone(),
2277 head,
2278 head_changed,
2279 language_changed,
2280 language.clone(),
2281 language_registry.clone(),
2282 cx,
2283 )
2284 .await?,
2285 )
2286 }
2287 }
2288
2289 let cancel = this.update(cx, |this, _| {
2290 // This checks whether all pending stage/unstage operations
2291 // have quiesced (i.e. both the corresponding write and the
2292 // read of that write have completed). If not, then we cancel
2293 // this recalculation attempt to avoid invalidating pending
2294 // state too quickly; another recalculation will come along
2295 // later and clear the pending state once the state of the index has settled.
2296 if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
2297 *this.recalculating_tx.borrow_mut() = false;
2298 true
2299 } else {
2300 false
2301 }
2302 })?;
2303 if cancel {
2304 log::debug!(
2305 concat!(
2306 "aborting recalculating diffs for buffer {}",
2307 "due to subsequent hunk operations",
2308 ),
2309 buffer.remote_id()
2310 );
2311 return Ok(());
2312 }
2313
2314 let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
2315 unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
2316 {
2317 unstaged_diff.update(cx, |diff, cx| {
2318 if language_changed {
2319 diff.language_changed(cx);
2320 }
2321 diff.set_snapshot(new_unstaged_diff, &buffer, cx)
2322 })?
2323 } else {
2324 None
2325 };
2326
2327 if let Some((uncommitted_diff, new_uncommitted_diff)) =
2328 uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
2329 {
2330 uncommitted_diff.update(cx, |diff, cx| {
2331 if language_changed {
2332 diff.language_changed(cx);
2333 }
2334 diff.set_snapshot_with_secondary(
2335 new_uncommitted_diff,
2336 &buffer,
2337 unstaged_changed_range,
2338 true,
2339 cx,
2340 );
2341 })?;
2342 }
2343
2344 log::debug!(
2345 "finished recalculating diffs for buffer {}",
2346 buffer.remote_id()
2347 );
2348
2349 if let Some(this) = this.upgrade() {
2350 this.update(cx, |this, _| {
2351 this.index_changed = false;
2352 this.head_changed = false;
2353 this.language_changed = false;
2354 *this.recalculating_tx.borrow_mut() = false;
2355 })?;
2356 }
2357
2358 Ok(())
2359 }));
2360 }
2361}
2362
2363impl Default for BufferDiffState {
2364 fn default() -> Self {
2365 Self {
2366 unstaged_diff: Default::default(),
2367 uncommitted_diff: Default::default(),
2368 recalculate_diff_task: Default::default(),
2369 language: Default::default(),
2370 language_registry: Default::default(),
2371 recalculating_tx: postage::watch::channel_with(false).0,
2372 hunk_staging_operation_count: 0,
2373 hunk_staging_operation_count_as_of_write: 0,
2374 head_text: Default::default(),
2375 index_text: Default::default(),
2376 head_changed: Default::default(),
2377 index_changed: Default::default(),
2378 language_changed: Default::default(),
2379 }
2380 }
2381}
2382
2383fn make_remote_delegate(
2384 this: Entity<GitStore>,
2385 project_id: u64,
2386 repository_id: RepositoryId,
2387 askpass_id: u64,
2388 cx: &mut AsyncApp,
2389) -> AskPassDelegate {
2390 AskPassDelegate::new(cx, move |prompt, tx, cx| {
2391 this.update(cx, |this, cx| {
2392 let Some((client, _)) = this.downstream_client() else {
2393 return;
2394 };
2395 let response = client.request(proto::AskPassRequest {
2396 project_id,
2397 repository_id: repository_id.to_proto(),
2398 askpass_id,
2399 prompt,
2400 });
2401 cx.spawn(async move |_, _| {
2402 tx.send(response.await?.response).ok();
2403 anyhow::Ok(())
2404 })
2405 .detach_and_log_err(cx);
2406 })
2407 .log_err();
2408 })
2409}
2410
2411impl RepositoryId {
2412 pub fn to_proto(self) -> u64 {
2413 self.0
2414 }
2415
2416 pub fn from_proto(id: u64) -> Self {
2417 RepositoryId(id)
2418 }
2419}
2420
2421impl RepositorySnapshot {
2422 fn empty(id: RepositoryId, work_directory_abs_path: Arc<Path>) -> Self {
2423 Self {
2424 id,
2425 merge_message: None,
2426 statuses_by_path: Default::default(),
2427 work_directory_abs_path,
2428 branch: None,
2429 merge_conflicts: Default::default(),
2430 merge_head_shas: Default::default(),
2431 scan_id: 0,
2432 }
2433 }
2434
2435 fn initial_update(&self, project_id: u64) -> proto::UpdateRepository {
2436 proto::UpdateRepository {
2437 branch_summary: self.branch.as_ref().map(branch_to_proto),
2438 updated_statuses: self
2439 .statuses_by_path
2440 .iter()
2441 .map(|entry| entry.to_proto())
2442 .collect(),
2443 removed_statuses: Default::default(),
2444 current_merge_conflicts: self
2445 .merge_conflicts
2446 .iter()
2447 .map(|repo_path| repo_path.to_proto())
2448 .collect(),
2449 project_id,
2450 id: self.id.to_proto(),
2451 abs_path: self.work_directory_abs_path.to_proto(),
2452 entry_ids: vec![self.id.to_proto()],
2453 scan_id: self.scan_id,
2454 is_last_update: true,
2455 }
2456 }
2457
2458 fn build_update(&self, old: &Self, project_id: u64) -> proto::UpdateRepository {
2459 let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
2460 let mut removed_statuses: Vec<String> = Vec::new();
2461
2462 let mut new_statuses = self.statuses_by_path.iter().peekable();
2463 let mut old_statuses = old.statuses_by_path.iter().peekable();
2464
2465 let mut current_new_entry = new_statuses.next();
2466 let mut current_old_entry = old_statuses.next();
2467 loop {
2468 match (current_new_entry, current_old_entry) {
2469 (Some(new_entry), Some(old_entry)) => {
2470 match new_entry.repo_path.cmp(&old_entry.repo_path) {
2471 Ordering::Less => {
2472 updated_statuses.push(new_entry.to_proto());
2473 current_new_entry = new_statuses.next();
2474 }
2475 Ordering::Equal => {
2476 if new_entry.status != old_entry.status {
2477 updated_statuses.push(new_entry.to_proto());
2478 }
2479 current_old_entry = old_statuses.next();
2480 current_new_entry = new_statuses.next();
2481 }
2482 Ordering::Greater => {
2483 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2484 current_old_entry = old_statuses.next();
2485 }
2486 }
2487 }
2488 (None, Some(old_entry)) => {
2489 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2490 current_old_entry = old_statuses.next();
2491 }
2492 (Some(new_entry), None) => {
2493 updated_statuses.push(new_entry.to_proto());
2494 current_new_entry = new_statuses.next();
2495 }
2496 (None, None) => break,
2497 }
2498 }
2499
2500 proto::UpdateRepository {
2501 branch_summary: self.branch.as_ref().map(branch_to_proto),
2502 updated_statuses,
2503 removed_statuses,
2504 current_merge_conflicts: self
2505 .merge_conflicts
2506 .iter()
2507 .map(|path| path.as_ref().to_proto())
2508 .collect(),
2509 project_id,
2510 id: self.id.to_proto(),
2511 abs_path: self.work_directory_abs_path.to_proto(),
2512 entry_ids: vec![],
2513 scan_id: self.scan_id,
2514 is_last_update: true,
2515 }
2516 }
2517
2518 pub fn status(&self) -> impl Iterator<Item = StatusEntry> + '_ {
2519 self.statuses_by_path.iter().cloned()
2520 }
2521
2522 pub fn status_summary(&self) -> GitSummary {
2523 self.statuses_by_path.summary().item_summary
2524 }
2525
2526 pub fn status_for_path(&self, path: &RepoPath) -> Option<StatusEntry> {
2527 self.statuses_by_path
2528 .get(&PathKey(path.0.clone()), &())
2529 .cloned()
2530 }
2531
2532 pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
2533 abs_path
2534 .strip_prefix(&self.work_directory_abs_path)
2535 .map(RepoPath::from)
2536 .ok()
2537 }
2538
2539 pub fn has_conflict(&self, repo_path: &RepoPath) -> bool {
2540 self.merge_conflicts.contains(repo_path)
2541 }
2542
2543 /// This is the name that will be displayed in the repository selector for this repository.
2544 pub fn display_name(&self) -> SharedString {
2545 self.work_directory_abs_path
2546 .file_name()
2547 .unwrap_or_default()
2548 .to_string_lossy()
2549 .to_string()
2550 .into()
2551 }
2552}
2553
2554impl Repository {
2555 fn local(
2556 id: RepositoryId,
2557 work_directory_abs_path: Arc<Path>,
2558 dot_git_abs_path: Arc<Path>,
2559 repository_dir_abs_path: Arc<Path>,
2560 common_dir_abs_path: Arc<Path>,
2561 project_environment: WeakEntity<ProjectEnvironment>,
2562 fs: Arc<dyn Fs>,
2563 git_store: WeakEntity<GitStore>,
2564 cx: &mut Context<Self>,
2565 ) -> Self {
2566 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path.clone());
2567 Repository {
2568 this: cx.weak_entity(),
2569 git_store,
2570 snapshot,
2571 commit_message_buffer: None,
2572 askpass_delegates: Default::default(),
2573 paths_needing_status_update: Default::default(),
2574 latest_askpass_id: 0,
2575 job_sender: Repository::spawn_local_git_worker(
2576 work_directory_abs_path,
2577 dot_git_abs_path,
2578 repository_dir_abs_path,
2579 common_dir_abs_path,
2580 project_environment,
2581 fs,
2582 cx,
2583 ),
2584 job_id: 0,
2585 active_jobs: Default::default(),
2586 }
2587 }
2588
2589 fn remote(
2590 id: RepositoryId,
2591 work_directory_abs_path: Arc<Path>,
2592 project_id: ProjectId,
2593 client: AnyProtoClient,
2594 git_store: WeakEntity<GitStore>,
2595 cx: &mut Context<Self>,
2596 ) -> Self {
2597 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path);
2598 Self {
2599 this: cx.weak_entity(),
2600 snapshot,
2601 commit_message_buffer: None,
2602 git_store,
2603 paths_needing_status_update: Default::default(),
2604 job_sender: Self::spawn_remote_git_worker(project_id, client, cx),
2605 askpass_delegates: Default::default(),
2606 latest_askpass_id: 0,
2607 active_jobs: Default::default(),
2608 job_id: 0,
2609 }
2610 }
2611
2612 pub fn git_store(&self) -> Option<Entity<GitStore>> {
2613 self.git_store.upgrade()
2614 }
2615
2616 fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
2617 let this = cx.weak_entity();
2618 let git_store = self.git_store.clone();
2619 let _ = self.send_keyed_job(
2620 Some(GitJobKey::ReloadBufferDiffBases),
2621 None,
2622 |state, mut cx| async move {
2623 let RepositoryState::Local { backend, .. } = state else {
2624 log::error!("tried to recompute diffs for a non-local repository");
2625 return Ok(());
2626 };
2627
2628 let Some(this) = this.upgrade() else {
2629 return Ok(());
2630 };
2631
2632 let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
2633 git_store.update(cx, |git_store, cx| {
2634 git_store
2635 .diffs
2636 .iter()
2637 .filter_map(|(buffer_id, diff_state)| {
2638 let buffer_store = git_store.buffer_store.read(cx);
2639 let buffer = buffer_store.get(*buffer_id)?;
2640 let file = File::from_dyn(buffer.read(cx).file())?;
2641 let abs_path =
2642 file.worktree.read(cx).absolutize(&file.path).ok()?;
2643 let repo_path = this.abs_path_to_repo_path(&abs_path)?;
2644 log::debug!(
2645 "start reload diff bases for repo path {}",
2646 repo_path.0.display()
2647 );
2648 diff_state.update(cx, |diff_state, _| {
2649 let has_unstaged_diff = diff_state
2650 .unstaged_diff
2651 .as_ref()
2652 .is_some_and(|diff| diff.is_upgradable());
2653 let has_uncommitted_diff = diff_state
2654 .uncommitted_diff
2655 .as_ref()
2656 .is_some_and(|set| set.is_upgradable());
2657
2658 Some((
2659 buffer,
2660 repo_path,
2661 has_unstaged_diff.then(|| diff_state.index_text.clone()),
2662 has_uncommitted_diff.then(|| diff_state.head_text.clone()),
2663 ))
2664 })
2665 })
2666 .collect::<Vec<_>>()
2667 })
2668 })??;
2669
2670 let buffer_diff_base_changes = cx
2671 .background_spawn(async move {
2672 let mut changes = Vec::new();
2673 for (buffer, repo_path, current_index_text, current_head_text) in
2674 &repo_diff_state_updates
2675 {
2676 let index_text = if current_index_text.is_some() {
2677 backend.load_index_text(repo_path.clone()).await
2678 } else {
2679 None
2680 };
2681 let head_text = if current_head_text.is_some() {
2682 backend.load_committed_text(repo_path.clone()).await
2683 } else {
2684 None
2685 };
2686
2687 let change =
2688 match (current_index_text.as_ref(), current_head_text.as_ref()) {
2689 (Some(current_index), Some(current_head)) => {
2690 let index_changed =
2691 index_text.as_ref() != current_index.as_deref();
2692 let head_changed =
2693 head_text.as_ref() != current_head.as_deref();
2694 if index_changed && head_changed {
2695 if index_text == head_text {
2696 Some(DiffBasesChange::SetBoth(head_text))
2697 } else {
2698 Some(DiffBasesChange::SetEach {
2699 index: index_text,
2700 head: head_text,
2701 })
2702 }
2703 } else if index_changed {
2704 Some(DiffBasesChange::SetIndex(index_text))
2705 } else if head_changed {
2706 Some(DiffBasesChange::SetHead(head_text))
2707 } else {
2708 None
2709 }
2710 }
2711 (Some(current_index), None) => {
2712 let index_changed =
2713 index_text.as_ref() != current_index.as_deref();
2714 index_changed
2715 .then_some(DiffBasesChange::SetIndex(index_text))
2716 }
2717 (None, Some(current_head)) => {
2718 let head_changed =
2719 head_text.as_ref() != current_head.as_deref();
2720 head_changed.then_some(DiffBasesChange::SetHead(head_text))
2721 }
2722 (None, None) => None,
2723 };
2724
2725 changes.push((buffer.clone(), change))
2726 }
2727 changes
2728 })
2729 .await;
2730
2731 git_store.update(&mut cx, |git_store, cx| {
2732 for (buffer, diff_bases_change) in buffer_diff_base_changes {
2733 let buffer_snapshot = buffer.read(cx).text_snapshot();
2734 let buffer_id = buffer_snapshot.remote_id();
2735 let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
2736 continue;
2737 };
2738
2739 let downstream_client = git_store.downstream_client();
2740 diff_state.update(cx, |diff_state, cx| {
2741 use proto::update_diff_bases::Mode;
2742
2743 if let Some((diff_bases_change, (client, project_id))) =
2744 diff_bases_change.clone().zip(downstream_client)
2745 {
2746 let (staged_text, committed_text, mode) = match diff_bases_change {
2747 DiffBasesChange::SetIndex(index) => {
2748 (index, None, Mode::IndexOnly)
2749 }
2750 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
2751 DiffBasesChange::SetEach { index, head } => {
2752 (index, head, Mode::IndexAndHead)
2753 }
2754 DiffBasesChange::SetBoth(text) => {
2755 (None, text, Mode::IndexMatchesHead)
2756 }
2757 };
2758 client
2759 .send(proto::UpdateDiffBases {
2760 project_id: project_id.to_proto(),
2761 buffer_id: buffer_id.to_proto(),
2762 staged_text,
2763 committed_text,
2764 mode: mode as i32,
2765 })
2766 .log_err();
2767 }
2768
2769 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
2770 });
2771 }
2772 })
2773 },
2774 );
2775 }
2776
2777 pub fn send_job<F, Fut, R>(
2778 &mut self,
2779 status: Option<SharedString>,
2780 job: F,
2781 ) -> oneshot::Receiver<R>
2782 where
2783 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
2784 Fut: Future<Output = R> + 'static,
2785 R: Send + 'static,
2786 {
2787 self.send_keyed_job(None, status, job)
2788 }
2789
2790 fn send_keyed_job<F, Fut, R>(
2791 &mut self,
2792 key: Option<GitJobKey>,
2793 status: Option<SharedString>,
2794 job: F,
2795 ) -> oneshot::Receiver<R>
2796 where
2797 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
2798 Fut: Future<Output = R> + 'static,
2799 R: Send + 'static,
2800 {
2801 let (result_tx, result_rx) = futures::channel::oneshot::channel();
2802 let job_id = post_inc(&mut self.job_id);
2803 let this = self.this.clone();
2804 self.job_sender
2805 .unbounded_send(GitJob {
2806 key,
2807 job: Box::new(move |state, cx: &mut AsyncApp| {
2808 let job = job(state, cx.clone());
2809 cx.spawn(async move |cx| {
2810 if let Some(s) = status.clone() {
2811 this.update(cx, |this, cx| {
2812 this.active_jobs.insert(
2813 job_id,
2814 JobInfo {
2815 start: Instant::now(),
2816 message: s.clone(),
2817 },
2818 );
2819
2820 cx.notify();
2821 })
2822 .ok();
2823 }
2824 let result = job.await;
2825
2826 this.update(cx, |this, cx| {
2827 this.active_jobs.remove(&job_id);
2828 cx.notify();
2829 })
2830 .ok();
2831
2832 result_tx.send(result).ok();
2833 })
2834 }),
2835 })
2836 .ok();
2837 result_rx
2838 }
2839
2840 pub fn set_as_active_repository(&self, cx: &mut Context<Self>) {
2841 let Some(git_store) = self.git_store.upgrade() else {
2842 return;
2843 };
2844 let entity = cx.entity();
2845 git_store.update(cx, |git_store, cx| {
2846 let Some((&id, _)) = git_store
2847 .repositories
2848 .iter()
2849 .find(|(_, handle)| *handle == &entity)
2850 else {
2851 return;
2852 };
2853 git_store.active_repo_id = Some(id);
2854 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
2855 });
2856 }
2857
2858 pub fn cached_status(&self) -> impl '_ + Iterator<Item = StatusEntry> {
2859 self.snapshot.status()
2860 }
2861
2862 pub fn repo_path_to_project_path(&self, path: &RepoPath, cx: &App) -> Option<ProjectPath> {
2863 let git_store = self.git_store.upgrade()?;
2864 let worktree_store = git_store.read(cx).worktree_store.read(cx);
2865 let abs_path = self.snapshot.work_directory_abs_path.join(&path.0);
2866 let (worktree, relative_path) = worktree_store.find_worktree(abs_path, cx)?;
2867 Some(ProjectPath {
2868 worktree_id: worktree.read(cx).id(),
2869 path: relative_path.into(),
2870 })
2871 }
2872
2873 pub fn project_path_to_repo_path(&self, path: &ProjectPath, cx: &App) -> Option<RepoPath> {
2874 let git_store = self.git_store.upgrade()?;
2875 let worktree_store = git_store.read(cx).worktree_store.read(cx);
2876 let abs_path = worktree_store.absolutize(path, cx)?;
2877 self.snapshot.abs_path_to_repo_path(&abs_path)
2878 }
2879
2880 pub fn contains_sub_repo(&self, other: &Entity<Self>, cx: &App) -> bool {
2881 other
2882 .read(cx)
2883 .snapshot
2884 .work_directory_abs_path
2885 .starts_with(&self.snapshot.work_directory_abs_path)
2886 }
2887
2888 pub fn open_commit_buffer(
2889 &mut self,
2890 languages: Option<Arc<LanguageRegistry>>,
2891 buffer_store: Entity<BufferStore>,
2892 cx: &mut Context<Self>,
2893 ) -> Task<Result<Entity<Buffer>>> {
2894 let id = self.id;
2895 if let Some(buffer) = self.commit_message_buffer.clone() {
2896 return Task::ready(Ok(buffer));
2897 }
2898 let this = cx.weak_entity();
2899
2900 let rx = self.send_job(None, move |state, mut cx| async move {
2901 let Some(this) = this.upgrade() else {
2902 bail!("git store was dropped");
2903 };
2904 match state {
2905 RepositoryState::Local { .. } => {
2906 this.update(&mut cx, |_, cx| {
2907 Self::open_local_commit_buffer(languages, buffer_store, cx)
2908 })?
2909 .await
2910 }
2911 RepositoryState::Remote { project_id, client } => {
2912 let request = client.request(proto::OpenCommitMessageBuffer {
2913 project_id: project_id.0,
2914 repository_id: id.to_proto(),
2915 });
2916 let response = request.await.context("requesting to open commit buffer")?;
2917 let buffer_id = BufferId::new(response.buffer_id)?;
2918 let buffer = buffer_store
2919 .update(&mut cx, |buffer_store, cx| {
2920 buffer_store.wait_for_remote_buffer(buffer_id, cx)
2921 })?
2922 .await?;
2923 if let Some(language_registry) = languages {
2924 let git_commit_language =
2925 language_registry.language_for_name("Git Commit").await?;
2926 buffer.update(&mut cx, |buffer, cx| {
2927 buffer.set_language(Some(git_commit_language), cx);
2928 })?;
2929 }
2930 this.update(&mut cx, |this, _| {
2931 this.commit_message_buffer = Some(buffer.clone());
2932 })?;
2933 Ok(buffer)
2934 }
2935 }
2936 });
2937
2938 cx.spawn(|_, _: &mut AsyncApp| async move { rx.await? })
2939 }
2940
2941 fn open_local_commit_buffer(
2942 language_registry: Option<Arc<LanguageRegistry>>,
2943 buffer_store: Entity<BufferStore>,
2944 cx: &mut Context<Self>,
2945 ) -> Task<Result<Entity<Buffer>>> {
2946 cx.spawn(async move |repository, cx| {
2947 let buffer = buffer_store
2948 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx))?
2949 .await?;
2950
2951 if let Some(language_registry) = language_registry {
2952 let git_commit_language = language_registry.language_for_name("Git Commit").await?;
2953 buffer.update(cx, |buffer, cx| {
2954 buffer.set_language(Some(git_commit_language), cx);
2955 })?;
2956 }
2957
2958 repository.update(cx, |repository, _| {
2959 repository.commit_message_buffer = Some(buffer.clone());
2960 })?;
2961 Ok(buffer)
2962 })
2963 }
2964
2965 pub fn checkout_files(
2966 &mut self,
2967 commit: &str,
2968 paths: Vec<RepoPath>,
2969 _cx: &mut App,
2970 ) -> oneshot::Receiver<Result<()>> {
2971 let commit = commit.to_string();
2972 let id = self.id;
2973
2974 self.send_job(
2975 Some(format!("git checkout {}", commit).into()),
2976 move |git_repo, _| async move {
2977 match git_repo {
2978 RepositoryState::Local {
2979 backend,
2980 environment,
2981 ..
2982 } => {
2983 backend
2984 .checkout_files(commit, paths, environment.clone())
2985 .await
2986 }
2987 RepositoryState::Remote { project_id, client } => {
2988 client
2989 .request(proto::GitCheckoutFiles {
2990 project_id: project_id.0,
2991 repository_id: id.to_proto(),
2992 commit,
2993 paths: paths
2994 .into_iter()
2995 .map(|p| p.to_string_lossy().to_string())
2996 .collect(),
2997 })
2998 .await?;
2999
3000 Ok(())
3001 }
3002 }
3003 },
3004 )
3005 }
3006
3007 pub fn reset(
3008 &mut self,
3009 commit: String,
3010 reset_mode: ResetMode,
3011 _cx: &mut App,
3012 ) -> oneshot::Receiver<Result<()>> {
3013 let commit = commit.to_string();
3014 let id = self.id;
3015
3016 self.send_job(None, move |git_repo, _| async move {
3017 match git_repo {
3018 RepositoryState::Local {
3019 backend,
3020 environment,
3021 ..
3022 } => backend.reset(commit, reset_mode, environment).await,
3023 RepositoryState::Remote { project_id, client } => {
3024 client
3025 .request(proto::GitReset {
3026 project_id: project_id.0,
3027 repository_id: id.to_proto(),
3028 commit,
3029 mode: match reset_mode {
3030 ResetMode::Soft => git_reset::ResetMode::Soft.into(),
3031 ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
3032 },
3033 })
3034 .await?;
3035
3036 Ok(())
3037 }
3038 }
3039 })
3040 }
3041
3042 pub fn show(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDetails>> {
3043 let id = self.id;
3044 self.send_job(None, move |git_repo, _cx| async move {
3045 match git_repo {
3046 RepositoryState::Local { backend, .. } => backend.show(commit).await,
3047 RepositoryState::Remote { project_id, client } => {
3048 let resp = client
3049 .request(proto::GitShow {
3050 project_id: project_id.0,
3051 repository_id: id.to_proto(),
3052 commit,
3053 })
3054 .await?;
3055
3056 Ok(CommitDetails {
3057 sha: resp.sha.into(),
3058 message: resp.message.into(),
3059 commit_timestamp: resp.commit_timestamp,
3060 author_email: resp.author_email.into(),
3061 author_name: resp.author_name.into(),
3062 })
3063 }
3064 }
3065 })
3066 }
3067
3068 pub fn load_commit_diff(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
3069 let id = self.id;
3070 self.send_job(None, move |git_repo, cx| async move {
3071 match git_repo {
3072 RepositoryState::Local { backend, .. } => backend.load_commit(commit, cx).await,
3073 RepositoryState::Remote {
3074 client, project_id, ..
3075 } => {
3076 let response = client
3077 .request(proto::LoadCommitDiff {
3078 project_id: project_id.0,
3079 repository_id: id.to_proto(),
3080 commit,
3081 })
3082 .await?;
3083 Ok(CommitDiff {
3084 files: response
3085 .files
3086 .into_iter()
3087 .map(|file| CommitFile {
3088 path: Path::new(&file.path).into(),
3089 old_text: file.old_text,
3090 new_text: file.new_text,
3091 })
3092 .collect(),
3093 })
3094 }
3095 }
3096 })
3097 }
3098
3099 fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
3100 Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
3101 }
3102
3103 pub fn stage_entries(
3104 &self,
3105 entries: Vec<RepoPath>,
3106 cx: &mut Context<Self>,
3107 ) -> Task<anyhow::Result<()>> {
3108 if entries.is_empty() {
3109 return Task::ready(Ok(()));
3110 }
3111 let id = self.id;
3112
3113 let mut save_futures = Vec::new();
3114 if let Some(buffer_store) = self.buffer_store(cx) {
3115 buffer_store.update(cx, |buffer_store, cx| {
3116 for path in &entries {
3117 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3118 continue;
3119 };
3120 if let Some(buffer) = buffer_store.get_by_path(&project_path, cx) {
3121 if buffer
3122 .read(cx)
3123 .file()
3124 .map_or(false, |file| file.disk_state().exists())
3125 {
3126 save_futures.push(buffer_store.save_buffer(buffer, cx));
3127 }
3128 }
3129 }
3130 })
3131 }
3132
3133 cx.spawn(async move |this, cx| {
3134 for save_future in save_futures {
3135 save_future.await?;
3136 }
3137
3138 this.update(cx, |this, _| {
3139 this.send_job(None, move |git_repo, _cx| async move {
3140 match git_repo {
3141 RepositoryState::Local {
3142 backend,
3143 environment,
3144 ..
3145 } => backend.stage_paths(entries, environment.clone()).await,
3146 RepositoryState::Remote { project_id, client } => {
3147 client
3148 .request(proto::Stage {
3149 project_id: project_id.0,
3150 repository_id: id.to_proto(),
3151 paths: entries
3152 .into_iter()
3153 .map(|repo_path| repo_path.as_ref().to_proto())
3154 .collect(),
3155 })
3156 .await
3157 .context("sending stage request")?;
3158
3159 Ok(())
3160 }
3161 }
3162 })
3163 })?
3164 .await??;
3165
3166 Ok(())
3167 })
3168 }
3169
3170 pub fn unstage_entries(
3171 &self,
3172 entries: Vec<RepoPath>,
3173 cx: &mut Context<Self>,
3174 ) -> Task<anyhow::Result<()>> {
3175 if entries.is_empty() {
3176 return Task::ready(Ok(()));
3177 }
3178 let id = self.id;
3179
3180 let mut save_futures = Vec::new();
3181 if let Some(buffer_store) = self.buffer_store(cx) {
3182 buffer_store.update(cx, |buffer_store, cx| {
3183 for path in &entries {
3184 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3185 continue;
3186 };
3187 if let Some(buffer) = buffer_store.get_by_path(&project_path, cx) {
3188 if buffer
3189 .read(cx)
3190 .file()
3191 .map_or(false, |file| file.disk_state().exists())
3192 {
3193 save_futures.push(buffer_store.save_buffer(buffer, cx));
3194 }
3195 }
3196 }
3197 })
3198 }
3199
3200 cx.spawn(async move |this, cx| {
3201 for save_future in save_futures {
3202 save_future.await?;
3203 }
3204
3205 this.update(cx, |this, _| {
3206 this.send_job(None, move |git_repo, _cx| async move {
3207 match git_repo {
3208 RepositoryState::Local {
3209 backend,
3210 environment,
3211 ..
3212 } => backend.unstage_paths(entries, environment).await,
3213 RepositoryState::Remote { project_id, client } => {
3214 client
3215 .request(proto::Unstage {
3216 project_id: project_id.0,
3217 repository_id: id.to_proto(),
3218 paths: entries
3219 .into_iter()
3220 .map(|repo_path| repo_path.as_ref().to_proto())
3221 .collect(),
3222 })
3223 .await
3224 .context("sending unstage request")?;
3225
3226 Ok(())
3227 }
3228 }
3229 })
3230 })?
3231 .await??;
3232
3233 Ok(())
3234 })
3235 }
3236
3237 pub fn stage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3238 let to_stage = self
3239 .cached_status()
3240 .filter(|entry| !entry.status.staging().is_fully_staged())
3241 .map(|entry| entry.repo_path.clone())
3242 .collect();
3243 self.stage_entries(to_stage, cx)
3244 }
3245
3246 pub fn unstage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3247 let to_unstage = self
3248 .cached_status()
3249 .filter(|entry| entry.status.staging().has_staged())
3250 .map(|entry| entry.repo_path.clone())
3251 .collect();
3252 self.unstage_entries(to_unstage, cx)
3253 }
3254
3255 pub fn commit(
3256 &mut self,
3257 message: SharedString,
3258 name_and_email: Option<(SharedString, SharedString)>,
3259 options: CommitOptions,
3260 _cx: &mut App,
3261 ) -> oneshot::Receiver<Result<()>> {
3262 let id = self.id;
3263
3264 self.send_job(Some("git commit".into()), move |git_repo, _cx| async move {
3265 match git_repo {
3266 RepositoryState::Local {
3267 backend,
3268 environment,
3269 ..
3270 } => {
3271 backend
3272 .commit(message, name_and_email, options, environment)
3273 .await
3274 }
3275 RepositoryState::Remote { project_id, client } => {
3276 let (name, email) = name_and_email.unzip();
3277 client
3278 .request(proto::Commit {
3279 project_id: project_id.0,
3280 repository_id: id.to_proto(),
3281 message: String::from(message),
3282 name: name.map(String::from),
3283 email: email.map(String::from),
3284 options: Some(proto::commit::CommitOptions {
3285 amend: options.amend,
3286 }),
3287 })
3288 .await
3289 .context("sending commit request")?;
3290
3291 Ok(())
3292 }
3293 }
3294 })
3295 }
3296
3297 pub fn fetch(
3298 &mut self,
3299 askpass: AskPassDelegate,
3300 _cx: &mut App,
3301 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3302 let askpass_delegates = self.askpass_delegates.clone();
3303 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3304 let id = self.id;
3305
3306 self.send_job(Some("git fetch".into()), move |git_repo, cx| async move {
3307 match git_repo {
3308 RepositoryState::Local {
3309 backend,
3310 environment,
3311 ..
3312 } => backend.fetch(askpass, environment, cx).await,
3313 RepositoryState::Remote { project_id, client } => {
3314 askpass_delegates.lock().insert(askpass_id, askpass);
3315 let _defer = util::defer(|| {
3316 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3317 debug_assert!(askpass_delegate.is_some());
3318 });
3319
3320 let response = client
3321 .request(proto::Fetch {
3322 project_id: project_id.0,
3323 repository_id: id.to_proto(),
3324 askpass_id,
3325 })
3326 .await
3327 .context("sending fetch request")?;
3328
3329 Ok(RemoteCommandOutput {
3330 stdout: response.stdout,
3331 stderr: response.stderr,
3332 })
3333 }
3334 }
3335 })
3336 }
3337
3338 pub fn push(
3339 &mut self,
3340 branch: SharedString,
3341 remote: SharedString,
3342 options: Option<PushOptions>,
3343 askpass: AskPassDelegate,
3344 cx: &mut Context<Self>,
3345 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3346 let askpass_delegates = self.askpass_delegates.clone();
3347 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3348 let id = self.id;
3349
3350 let args = options
3351 .map(|option| match option {
3352 PushOptions::SetUpstream => " --set-upstream",
3353 PushOptions::Force => " --force",
3354 })
3355 .unwrap_or("");
3356
3357 let updates_tx = self
3358 .git_store()
3359 .and_then(|git_store| match &git_store.read(cx).state {
3360 GitStoreState::Local { downstream, .. } => downstream
3361 .as_ref()
3362 .map(|downstream| downstream.updates_tx.clone()),
3363 _ => None,
3364 });
3365
3366 let this = cx.weak_entity();
3367 self.send_job(
3368 Some(format!("git push {} {} {}", args, branch, remote).into()),
3369 move |git_repo, mut cx| async move {
3370 match git_repo {
3371 RepositoryState::Local {
3372 backend,
3373 environment,
3374 ..
3375 } => {
3376 let result = backend
3377 .push(
3378 branch.to_string(),
3379 remote.to_string(),
3380 options,
3381 askpass,
3382 environment.clone(),
3383 cx.clone(),
3384 )
3385 .await;
3386 if result.is_ok() {
3387 let branches = backend.branches().await?;
3388 let branch = branches.into_iter().find(|branch| branch.is_head);
3389 log::info!("head branch after scan is {branch:?}");
3390 let snapshot = this.update(&mut cx, |this, cx| {
3391 this.snapshot.branch = branch;
3392 let snapshot = this.snapshot.clone();
3393 cx.emit(RepositoryEvent::Updated { full_scan: false });
3394 snapshot
3395 })?;
3396 if let Some(updates_tx) = updates_tx {
3397 updates_tx
3398 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
3399 .ok();
3400 }
3401 }
3402 result
3403 }
3404 RepositoryState::Remote { project_id, client } => {
3405 askpass_delegates.lock().insert(askpass_id, askpass);
3406 let _defer = util::defer(|| {
3407 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3408 debug_assert!(askpass_delegate.is_some());
3409 });
3410 let response = client
3411 .request(proto::Push {
3412 project_id: project_id.0,
3413 repository_id: id.to_proto(),
3414 askpass_id,
3415 branch_name: branch.to_string(),
3416 remote_name: remote.to_string(),
3417 options: options.map(|options| match options {
3418 PushOptions::Force => proto::push::PushOptions::Force,
3419 PushOptions::SetUpstream => {
3420 proto::push::PushOptions::SetUpstream
3421 }
3422 }
3423 as i32),
3424 })
3425 .await
3426 .context("sending push request")?;
3427
3428 Ok(RemoteCommandOutput {
3429 stdout: response.stdout,
3430 stderr: response.stderr,
3431 })
3432 }
3433 }
3434 },
3435 )
3436 }
3437
3438 pub fn pull(
3439 &mut self,
3440 branch: SharedString,
3441 remote: SharedString,
3442 askpass: AskPassDelegate,
3443 _cx: &mut App,
3444 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3445 let askpass_delegates = self.askpass_delegates.clone();
3446 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3447 let id = self.id;
3448
3449 self.send_job(
3450 Some(format!("git pull {} {}", remote, branch).into()),
3451 move |git_repo, cx| async move {
3452 match git_repo {
3453 RepositoryState::Local {
3454 backend,
3455 environment,
3456 ..
3457 } => {
3458 backend
3459 .pull(
3460 branch.to_string(),
3461 remote.to_string(),
3462 askpass,
3463 environment.clone(),
3464 cx,
3465 )
3466 .await
3467 }
3468 RepositoryState::Remote { project_id, client } => {
3469 askpass_delegates.lock().insert(askpass_id, askpass);
3470 let _defer = util::defer(|| {
3471 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3472 debug_assert!(askpass_delegate.is_some());
3473 });
3474 let response = client
3475 .request(proto::Pull {
3476 project_id: project_id.0,
3477 repository_id: id.to_proto(),
3478 askpass_id,
3479 branch_name: branch.to_string(),
3480 remote_name: remote.to_string(),
3481 })
3482 .await
3483 .context("sending pull request")?;
3484
3485 Ok(RemoteCommandOutput {
3486 stdout: response.stdout,
3487 stderr: response.stderr,
3488 })
3489 }
3490 }
3491 },
3492 )
3493 }
3494
3495 fn spawn_set_index_text_job(
3496 &mut self,
3497 path: RepoPath,
3498 content: Option<String>,
3499 hunk_staging_operation_count: Option<usize>,
3500 cx: &mut Context<Self>,
3501 ) -> oneshot::Receiver<anyhow::Result<()>> {
3502 let id = self.id;
3503 let this = cx.weak_entity();
3504 let git_store = self.git_store.clone();
3505 self.send_keyed_job(
3506 Some(GitJobKey::WriteIndex(path.clone())),
3507 None,
3508 move |git_repo, mut cx| async move {
3509 log::debug!("start updating index text for buffer {}", path.display());
3510 match git_repo {
3511 RepositoryState::Local {
3512 backend,
3513 environment,
3514 ..
3515 } => {
3516 backend
3517 .set_index_text(path.clone(), content, environment.clone())
3518 .await?;
3519 }
3520 RepositoryState::Remote { project_id, client } => {
3521 client
3522 .request(proto::SetIndexText {
3523 project_id: project_id.0,
3524 repository_id: id.to_proto(),
3525 path: path.as_ref().to_proto(),
3526 text: content,
3527 })
3528 .await?;
3529 }
3530 }
3531 log::debug!("finish updating index text for buffer {}", path.display());
3532
3533 if let Some(hunk_staging_operation_count) = hunk_staging_operation_count {
3534 let project_path = this
3535 .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
3536 .ok()
3537 .flatten();
3538 git_store.update(&mut cx, |git_store, cx| {
3539 let buffer_id = git_store
3540 .buffer_store
3541 .read(cx)
3542 .get_by_path(&project_path?, cx)?
3543 .read(cx)
3544 .remote_id();
3545 let diff_state = git_store.diffs.get(&buffer_id)?;
3546 diff_state.update(cx, |diff_state, _| {
3547 diff_state.hunk_staging_operation_count_as_of_write =
3548 hunk_staging_operation_count;
3549 });
3550 Some(())
3551 })?;
3552 }
3553 Ok(())
3554 },
3555 )
3556 }
3557
3558 pub fn get_remotes(
3559 &mut self,
3560 branch_name: Option<String>,
3561 ) -> oneshot::Receiver<Result<Vec<Remote>>> {
3562 let id = self.id;
3563 self.send_job(None, move |repo, _cx| async move {
3564 match repo {
3565 RepositoryState::Local { backend, .. } => backend.get_remotes(branch_name).await,
3566 RepositoryState::Remote { project_id, client } => {
3567 let response = client
3568 .request(proto::GetRemotes {
3569 project_id: project_id.0,
3570 repository_id: id.to_proto(),
3571 branch_name,
3572 })
3573 .await?;
3574
3575 let remotes = response
3576 .remotes
3577 .into_iter()
3578 .map(|remotes| git::repository::Remote {
3579 name: remotes.name.into(),
3580 })
3581 .collect();
3582
3583 Ok(remotes)
3584 }
3585 }
3586 })
3587 }
3588
3589 pub fn branches(&mut self) -> oneshot::Receiver<Result<Vec<Branch>>> {
3590 let id = self.id;
3591 self.send_job(None, move |repo, cx| async move {
3592 match repo {
3593 RepositoryState::Local { backend, .. } => {
3594 let backend = backend.clone();
3595 cx.background_spawn(async move { backend.branches().await })
3596 .await
3597 }
3598 RepositoryState::Remote { project_id, client } => {
3599 let response = client
3600 .request(proto::GitGetBranches {
3601 project_id: project_id.0,
3602 repository_id: id.to_proto(),
3603 })
3604 .await?;
3605
3606 let branches = response
3607 .branches
3608 .into_iter()
3609 .map(|branch| proto_to_branch(&branch))
3610 .collect();
3611
3612 Ok(branches)
3613 }
3614 }
3615 })
3616 }
3617
3618 pub fn diff(&mut self, diff_type: DiffType, _cx: &App) -> oneshot::Receiver<Result<String>> {
3619 let id = self.id;
3620 self.send_job(None, move |repo, _cx| async move {
3621 match repo {
3622 RepositoryState::Local { backend, .. } => backend.diff(diff_type).await,
3623 RepositoryState::Remote { project_id, client } => {
3624 let response = client
3625 .request(proto::GitDiff {
3626 project_id: project_id.0,
3627 repository_id: id.to_proto(),
3628 diff_type: match diff_type {
3629 DiffType::HeadToIndex => {
3630 proto::git_diff::DiffType::HeadToIndex.into()
3631 }
3632 DiffType::HeadToWorktree => {
3633 proto::git_diff::DiffType::HeadToWorktree.into()
3634 }
3635 },
3636 })
3637 .await?;
3638
3639 Ok(response.diff)
3640 }
3641 }
3642 })
3643 }
3644
3645 pub fn create_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3646 let id = self.id;
3647 self.send_job(
3648 Some(format!("git switch -c {branch_name}").into()),
3649 move |repo, _cx| async move {
3650 match repo {
3651 RepositoryState::Local { backend, .. } => {
3652 backend.create_branch(branch_name).await
3653 }
3654 RepositoryState::Remote { project_id, client } => {
3655 client
3656 .request(proto::GitCreateBranch {
3657 project_id: project_id.0,
3658 repository_id: id.to_proto(),
3659 branch_name,
3660 })
3661 .await?;
3662
3663 Ok(())
3664 }
3665 }
3666 },
3667 )
3668 }
3669
3670 pub fn change_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3671 let id = self.id;
3672 self.send_job(
3673 Some(format!("git switch {branch_name}").into()),
3674 move |repo, _cx| async move {
3675 match repo {
3676 RepositoryState::Local { backend, .. } => {
3677 backend.change_branch(branch_name).await
3678 }
3679 RepositoryState::Remote { project_id, client } => {
3680 client
3681 .request(proto::GitChangeBranch {
3682 project_id: project_id.0,
3683 repository_id: id.to_proto(),
3684 branch_name,
3685 })
3686 .await?;
3687
3688 Ok(())
3689 }
3690 }
3691 },
3692 )
3693 }
3694
3695 pub fn check_for_pushed_commits(&mut self) -> oneshot::Receiver<Result<Vec<SharedString>>> {
3696 let id = self.id;
3697 self.send_job(None, move |repo, _cx| async move {
3698 match repo {
3699 RepositoryState::Local { backend, .. } => backend.check_for_pushed_commit().await,
3700 RepositoryState::Remote { project_id, client } => {
3701 let response = client
3702 .request(proto::CheckForPushedCommits {
3703 project_id: project_id.0,
3704 repository_id: id.to_proto(),
3705 })
3706 .await?;
3707
3708 let branches = response.pushed_to.into_iter().map(Into::into).collect();
3709
3710 Ok(branches)
3711 }
3712 }
3713 })
3714 }
3715
3716 pub fn checkpoint(&mut self) -> oneshot::Receiver<Result<GitRepositoryCheckpoint>> {
3717 self.send_job(None, |repo, _cx| async move {
3718 match repo {
3719 RepositoryState::Local { backend, .. } => backend.checkpoint().await,
3720 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3721 }
3722 })
3723 }
3724
3725 pub fn restore_checkpoint(
3726 &mut self,
3727 checkpoint: GitRepositoryCheckpoint,
3728 ) -> oneshot::Receiver<Result<()>> {
3729 self.send_job(None, move |repo, _cx| async move {
3730 match repo {
3731 RepositoryState::Local { backend, .. } => {
3732 backend.restore_checkpoint(checkpoint).await
3733 }
3734 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3735 }
3736 })
3737 }
3738
3739 pub(crate) fn apply_remote_update(
3740 &mut self,
3741 update: proto::UpdateRepository,
3742 cx: &mut Context<Self>,
3743 ) -> Result<()> {
3744 let conflicted_paths = TreeSet::from_ordered_entries(
3745 update
3746 .current_merge_conflicts
3747 .into_iter()
3748 .map(|path| RepoPath(Path::new(&path).into())),
3749 );
3750 self.snapshot.branch = update.branch_summary.as_ref().map(proto_to_branch);
3751 self.snapshot.merge_conflicts = conflicted_paths;
3752
3753 let edits = update
3754 .removed_statuses
3755 .into_iter()
3756 .map(|path| sum_tree::Edit::Remove(PathKey(FromProto::from_proto(path))))
3757 .chain(
3758 update
3759 .updated_statuses
3760 .into_iter()
3761 .filter_map(|updated_status| {
3762 Some(sum_tree::Edit::Insert(updated_status.try_into().log_err()?))
3763 }),
3764 )
3765 .collect::<Vec<_>>();
3766 self.snapshot.statuses_by_path.edit(edits, &());
3767 if update.is_last_update {
3768 self.snapshot.scan_id = update.scan_id;
3769 }
3770 cx.emit(RepositoryEvent::Updated { full_scan: true });
3771 Ok(())
3772 }
3773
3774 pub fn compare_checkpoints(
3775 &mut self,
3776 left: GitRepositoryCheckpoint,
3777 right: GitRepositoryCheckpoint,
3778 ) -> oneshot::Receiver<Result<bool>> {
3779 self.send_job(None, move |repo, _cx| async move {
3780 match repo {
3781 RepositoryState::Local { backend, .. } => {
3782 backend.compare_checkpoints(left, right).await
3783 }
3784 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3785 }
3786 })
3787 }
3788
3789 pub fn delete_checkpoint(
3790 &mut self,
3791 checkpoint: GitRepositoryCheckpoint,
3792 ) -> oneshot::Receiver<Result<()>> {
3793 self.send_job(None, move |repo, _cx| async move {
3794 match repo {
3795 RepositoryState::Local { backend, .. } => {
3796 backend.delete_checkpoint(checkpoint).await
3797 }
3798 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3799 }
3800 })
3801 }
3802
3803 pub fn diff_checkpoints(
3804 &mut self,
3805 base_checkpoint: GitRepositoryCheckpoint,
3806 target_checkpoint: GitRepositoryCheckpoint,
3807 ) -> oneshot::Receiver<Result<String>> {
3808 self.send_job(None, move |repo, _cx| async move {
3809 match repo {
3810 RepositoryState::Local { backend, .. } => {
3811 backend
3812 .diff_checkpoints(base_checkpoint, target_checkpoint)
3813 .await
3814 }
3815 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3816 }
3817 })
3818 }
3819
3820 fn schedule_scan(
3821 &mut self,
3822 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
3823 cx: &mut Context<Self>,
3824 ) {
3825 let this = cx.weak_entity();
3826 let _ = self.send_keyed_job(
3827 Some(GitJobKey::ReloadGitState),
3828 None,
3829 |state, mut cx| async move {
3830 let Some(this) = this.upgrade() else {
3831 return Ok(());
3832 };
3833 let RepositoryState::Local { backend, .. } = state else {
3834 bail!("not a local repository")
3835 };
3836 let (snapshot, events) = this
3837 .update(&mut cx, |this, _| {
3838 compute_snapshot(
3839 this.id,
3840 this.work_directory_abs_path.clone(),
3841 this.snapshot.clone(),
3842 backend.clone(),
3843 )
3844 })?
3845 .await?;
3846 this.update(&mut cx, |this, cx| {
3847 this.snapshot = snapshot.clone();
3848 for event in events {
3849 cx.emit(event);
3850 }
3851 })?;
3852 if let Some(updates_tx) = updates_tx {
3853 updates_tx
3854 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
3855 .ok();
3856 }
3857 Ok(())
3858 },
3859 );
3860 }
3861
3862 fn spawn_local_git_worker(
3863 work_directory_abs_path: Arc<Path>,
3864 dot_git_abs_path: Arc<Path>,
3865 _repository_dir_abs_path: Arc<Path>,
3866 _common_dir_abs_path: Arc<Path>,
3867 project_environment: WeakEntity<ProjectEnvironment>,
3868 fs: Arc<dyn Fs>,
3869 cx: &mut Context<Self>,
3870 ) -> mpsc::UnboundedSender<GitJob> {
3871 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
3872
3873 cx.spawn(async move |_, cx| {
3874 let environment = project_environment
3875 .upgrade()
3876 .ok_or_else(|| anyhow!("missing project environment"))?
3877 .update(cx, |project_environment, cx| {
3878 project_environment.get_directory_environment(work_directory_abs_path.clone(), cx)
3879 })?
3880 .await
3881 .unwrap_or_else(|| {
3882 log::error!("failed to get working directory environment for repository {work_directory_abs_path:?}");
3883 HashMap::default()
3884 });
3885 let backend = cx
3886 .background_spawn(async move {
3887 fs.open_repo(&dot_git_abs_path)
3888 .ok_or_else(|| anyhow!("failed to build repository"))
3889 })
3890 .await?;
3891
3892 if let Some(git_hosting_provider_registry) =
3893 cx.update(|cx| GitHostingProviderRegistry::try_global(cx))?
3894 {
3895 git_hosting_providers::register_additional_providers(
3896 git_hosting_provider_registry,
3897 backend.clone(),
3898 );
3899 }
3900
3901 let state = RepositoryState::Local {
3902 backend,
3903 environment: Arc::new(environment),
3904 };
3905 let mut jobs = VecDeque::new();
3906 loop {
3907 while let Ok(Some(next_job)) = job_rx.try_next() {
3908 jobs.push_back(next_job);
3909 }
3910
3911 if let Some(job) = jobs.pop_front() {
3912 if let Some(current_key) = &job.key {
3913 if jobs
3914 .iter()
3915 .any(|other_job| other_job.key.as_ref() == Some(current_key))
3916 {
3917 continue;
3918 }
3919 }
3920 (job.job)(state.clone(), cx).await;
3921 } else if let Some(job) = job_rx.next().await {
3922 jobs.push_back(job);
3923 } else {
3924 break;
3925 }
3926 }
3927 anyhow::Ok(())
3928 })
3929 .detach_and_log_err(cx);
3930
3931 job_tx
3932 }
3933
3934 fn spawn_remote_git_worker(
3935 project_id: ProjectId,
3936 client: AnyProtoClient,
3937 cx: &mut Context<Self>,
3938 ) -> mpsc::UnboundedSender<GitJob> {
3939 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
3940
3941 cx.spawn(async move |_, cx| {
3942 let state = RepositoryState::Remote { project_id, client };
3943 let mut jobs = VecDeque::new();
3944 loop {
3945 while let Ok(Some(next_job)) = job_rx.try_next() {
3946 jobs.push_back(next_job);
3947 }
3948
3949 if let Some(job) = jobs.pop_front() {
3950 if let Some(current_key) = &job.key {
3951 if jobs
3952 .iter()
3953 .any(|other_job| other_job.key.as_ref() == Some(current_key))
3954 {
3955 continue;
3956 }
3957 }
3958 (job.job)(state.clone(), cx).await;
3959 } else if let Some(job) = job_rx.next().await {
3960 jobs.push_back(job);
3961 } else {
3962 break;
3963 }
3964 }
3965 anyhow::Ok(())
3966 })
3967 .detach_and_log_err(cx);
3968
3969 job_tx
3970 }
3971
3972 fn load_staged_text(
3973 &mut self,
3974 buffer_id: BufferId,
3975 repo_path: RepoPath,
3976 cx: &App,
3977 ) -> Task<Result<Option<String>>> {
3978 let rx = self.send_job(None, move |state, _| async move {
3979 match state {
3980 RepositoryState::Local { backend, .. } => {
3981 anyhow::Ok(backend.load_index_text(repo_path).await)
3982 }
3983 RepositoryState::Remote { project_id, client } => {
3984 let response = client
3985 .request(proto::OpenUnstagedDiff {
3986 project_id: project_id.to_proto(),
3987 buffer_id: buffer_id.to_proto(),
3988 })
3989 .await?;
3990 Ok(response.staged_text)
3991 }
3992 }
3993 });
3994 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
3995 }
3996
3997 fn load_committed_text(
3998 &mut self,
3999 buffer_id: BufferId,
4000 repo_path: RepoPath,
4001 cx: &App,
4002 ) -> Task<Result<DiffBasesChange>> {
4003 let rx = self.send_job(None, move |state, _| async move {
4004 match state {
4005 RepositoryState::Local { backend, .. } => {
4006 let committed_text = backend.load_committed_text(repo_path.clone()).await;
4007 let staged_text = backend.load_index_text(repo_path).await;
4008 let diff_bases_change = if committed_text == staged_text {
4009 DiffBasesChange::SetBoth(committed_text)
4010 } else {
4011 DiffBasesChange::SetEach {
4012 index: staged_text,
4013 head: committed_text,
4014 }
4015 };
4016 anyhow::Ok(diff_bases_change)
4017 }
4018 RepositoryState::Remote { project_id, client } => {
4019 use proto::open_uncommitted_diff_response::Mode;
4020
4021 let response = client
4022 .request(proto::OpenUncommittedDiff {
4023 project_id: project_id.to_proto(),
4024 buffer_id: buffer_id.to_proto(),
4025 })
4026 .await?;
4027 let mode =
4028 Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
4029 let bases = match mode {
4030 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
4031 Mode::IndexAndHead => DiffBasesChange::SetEach {
4032 head: response.committed_text,
4033 index: response.staged_text,
4034 },
4035 };
4036 Ok(bases)
4037 }
4038 }
4039 });
4040
4041 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4042 }
4043
4044 fn paths_changed(
4045 &mut self,
4046 paths: Vec<RepoPath>,
4047 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4048 cx: &mut Context<Self>,
4049 ) {
4050 self.paths_needing_status_update.extend(paths);
4051
4052 let this = cx.weak_entity();
4053 let _ = self.send_keyed_job(
4054 Some(GitJobKey::RefreshStatuses),
4055 None,
4056 |state, mut cx| async move {
4057 let (prev_snapshot, mut changed_paths) = this.update(&mut cx, |this, _| {
4058 (
4059 this.snapshot.clone(),
4060 mem::take(&mut this.paths_needing_status_update),
4061 )
4062 })?;
4063 let RepositoryState::Local { backend, .. } = state else {
4064 bail!("not a local repository")
4065 };
4066
4067 let paths = changed_paths.iter().cloned().collect::<Vec<_>>();
4068 let statuses = backend.status(&paths).await?;
4069
4070 let changed_path_statuses = cx
4071 .background_spawn(async move {
4072 let mut changed_path_statuses = Vec::new();
4073 let prev_statuses = prev_snapshot.statuses_by_path.clone();
4074 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4075
4076 for (repo_path, status) in &*statuses.entries {
4077 changed_paths.remove(repo_path);
4078 if cursor.seek_forward(&PathTarget::Path(repo_path), Bias::Left, &()) {
4079 if cursor.item().is_some_and(|entry| entry.status == *status) {
4080 continue;
4081 }
4082 }
4083
4084 changed_path_statuses.push(Edit::Insert(StatusEntry {
4085 repo_path: repo_path.clone(),
4086 status: *status,
4087 }));
4088 }
4089 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4090 for path in changed_paths.iter() {
4091 if cursor.seek_forward(&PathTarget::Path(&path), Bias::Left, &()) {
4092 changed_path_statuses.push(Edit::Remove(PathKey(path.0.clone())));
4093 }
4094 }
4095 changed_path_statuses
4096 })
4097 .await;
4098
4099 this.update(&mut cx, |this, cx| {
4100 if !changed_path_statuses.is_empty() {
4101 this.snapshot
4102 .statuses_by_path
4103 .edit(changed_path_statuses, &());
4104 this.snapshot.scan_id += 1;
4105 if let Some(updates_tx) = updates_tx {
4106 updates_tx
4107 .unbounded_send(DownstreamUpdate::UpdateRepository(
4108 this.snapshot.clone(),
4109 ))
4110 .ok();
4111 }
4112 }
4113 cx.emit(RepositoryEvent::Updated { full_scan: false });
4114 })
4115 },
4116 );
4117 }
4118
4119 /// currently running git command and when it started
4120 pub fn current_job(&self) -> Option<JobInfo> {
4121 self.active_jobs.values().next().cloned()
4122 }
4123
4124 pub fn barrier(&mut self) -> oneshot::Receiver<()> {
4125 self.send_job(None, |_, _| async {})
4126 }
4127}
4128
4129fn get_permalink_in_rust_registry_src(
4130 provider_registry: Arc<GitHostingProviderRegistry>,
4131 path: PathBuf,
4132 selection: Range<u32>,
4133) -> Result<url::Url> {
4134 #[derive(Deserialize)]
4135 struct CargoVcsGit {
4136 sha1: String,
4137 }
4138
4139 #[derive(Deserialize)]
4140 struct CargoVcsInfo {
4141 git: CargoVcsGit,
4142 path_in_vcs: String,
4143 }
4144
4145 #[derive(Deserialize)]
4146 struct CargoPackage {
4147 repository: String,
4148 }
4149
4150 #[derive(Deserialize)]
4151 struct CargoToml {
4152 package: CargoPackage,
4153 }
4154
4155 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
4156 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
4157 Some((dir, json))
4158 }) else {
4159 bail!("No .cargo_vcs_info.json found in parent directories")
4160 };
4161 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
4162 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
4163 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
4164 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
4165 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
4166 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
4167 let permalink = provider.build_permalink(
4168 remote,
4169 BuildPermalinkParams {
4170 sha: &cargo_vcs_info.git.sha1,
4171 path: &path.to_string_lossy(),
4172 selection: Some(selection),
4173 },
4174 );
4175 Ok(permalink)
4176}
4177
4178fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
4179 let Some(blame) = blame else {
4180 return proto::BlameBufferResponse {
4181 blame_response: None,
4182 };
4183 };
4184
4185 let entries = blame
4186 .entries
4187 .into_iter()
4188 .map(|entry| proto::BlameEntry {
4189 sha: entry.sha.as_bytes().into(),
4190 start_line: entry.range.start,
4191 end_line: entry.range.end,
4192 original_line_number: entry.original_line_number,
4193 author: entry.author.clone(),
4194 author_mail: entry.author_mail.clone(),
4195 author_time: entry.author_time,
4196 author_tz: entry.author_tz.clone(),
4197 committer: entry.committer_name.clone(),
4198 committer_mail: entry.committer_email.clone(),
4199 committer_time: entry.committer_time,
4200 committer_tz: entry.committer_tz.clone(),
4201 summary: entry.summary.clone(),
4202 previous: entry.previous.clone(),
4203 filename: entry.filename.clone(),
4204 })
4205 .collect::<Vec<_>>();
4206
4207 let messages = blame
4208 .messages
4209 .into_iter()
4210 .map(|(oid, message)| proto::CommitMessage {
4211 oid: oid.as_bytes().into(),
4212 message,
4213 })
4214 .collect::<Vec<_>>();
4215
4216 proto::BlameBufferResponse {
4217 blame_response: Some(proto::blame_buffer_response::BlameResponse {
4218 entries,
4219 messages,
4220 remote_url: blame.remote_url,
4221 }),
4222 }
4223}
4224
4225fn deserialize_blame_buffer_response(
4226 response: proto::BlameBufferResponse,
4227) -> Option<git::blame::Blame> {
4228 let response = response.blame_response?;
4229 let entries = response
4230 .entries
4231 .into_iter()
4232 .filter_map(|entry| {
4233 Some(git::blame::BlameEntry {
4234 sha: git::Oid::from_bytes(&entry.sha).ok()?,
4235 range: entry.start_line..entry.end_line,
4236 original_line_number: entry.original_line_number,
4237 committer_name: entry.committer,
4238 committer_time: entry.committer_time,
4239 committer_tz: entry.committer_tz,
4240 committer_email: entry.committer_mail,
4241 author: entry.author,
4242 author_mail: entry.author_mail,
4243 author_time: entry.author_time,
4244 author_tz: entry.author_tz,
4245 summary: entry.summary,
4246 previous: entry.previous,
4247 filename: entry.filename,
4248 })
4249 })
4250 .collect::<Vec<_>>();
4251
4252 let messages = response
4253 .messages
4254 .into_iter()
4255 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
4256 .collect::<HashMap<_, _>>();
4257
4258 Some(Blame {
4259 entries,
4260 messages,
4261 remote_url: response.remote_url,
4262 })
4263}
4264
4265fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
4266 proto::Branch {
4267 is_head: branch.is_head,
4268 name: branch.name.to_string(),
4269 unix_timestamp: branch
4270 .most_recent_commit
4271 .as_ref()
4272 .map(|commit| commit.commit_timestamp as u64),
4273 upstream: branch.upstream.as_ref().map(|upstream| proto::GitUpstream {
4274 ref_name: upstream.ref_name.to_string(),
4275 tracking: upstream
4276 .tracking
4277 .status()
4278 .map(|upstream| proto::UpstreamTracking {
4279 ahead: upstream.ahead as u64,
4280 behind: upstream.behind as u64,
4281 }),
4282 }),
4283 most_recent_commit: branch
4284 .most_recent_commit
4285 .as_ref()
4286 .map(|commit| proto::CommitSummary {
4287 sha: commit.sha.to_string(),
4288 subject: commit.subject.to_string(),
4289 commit_timestamp: commit.commit_timestamp,
4290 }),
4291 }
4292}
4293
4294fn proto_to_branch(proto: &proto::Branch) -> git::repository::Branch {
4295 git::repository::Branch {
4296 is_head: proto.is_head,
4297 name: proto.name.clone().into(),
4298 upstream: proto
4299 .upstream
4300 .as_ref()
4301 .map(|upstream| git::repository::Upstream {
4302 ref_name: upstream.ref_name.to_string().into(),
4303 tracking: upstream
4304 .tracking
4305 .as_ref()
4306 .map(|tracking| {
4307 git::repository::UpstreamTracking::Tracked(UpstreamTrackingStatus {
4308 ahead: tracking.ahead as u32,
4309 behind: tracking.behind as u32,
4310 })
4311 })
4312 .unwrap_or(git::repository::UpstreamTracking::Gone),
4313 }),
4314 most_recent_commit: proto.most_recent_commit.as_ref().map(|commit| {
4315 git::repository::CommitSummary {
4316 sha: commit.sha.to_string().into(),
4317 subject: commit.subject.to_string().into(),
4318 commit_timestamp: commit.commit_timestamp,
4319 has_parent: true,
4320 }
4321 }),
4322 }
4323}
4324
4325async fn compute_snapshot(
4326 id: RepositoryId,
4327 work_directory_abs_path: Arc<Path>,
4328 prev_snapshot: RepositorySnapshot,
4329 backend: Arc<dyn GitRepository>,
4330) -> Result<(RepositorySnapshot, Vec<RepositoryEvent>)> {
4331 let mut events = Vec::new();
4332 let branches = backend.branches().await?;
4333 let branch = branches.into_iter().find(|branch| branch.is_head);
4334 let statuses = backend.status(&[WORK_DIRECTORY_REPO_PATH.clone()]).await?;
4335 let merge_message = backend
4336 .merge_message()
4337 .await
4338 .and_then(|msg| Some(msg.lines().nth(0)?.to_owned().into()));
4339 let merge_head_shas = backend
4340 .merge_head_shas()
4341 .into_iter()
4342 .map(SharedString::from)
4343 .collect();
4344
4345 let statuses_by_path = SumTree::from_iter(
4346 statuses
4347 .entries
4348 .iter()
4349 .map(|(repo_path, status)| StatusEntry {
4350 repo_path: repo_path.clone(),
4351 status: *status,
4352 }),
4353 &(),
4354 );
4355
4356 let merge_head_shas_changed = merge_head_shas != prev_snapshot.merge_head_shas;
4357
4358 if merge_head_shas_changed
4359 || branch != prev_snapshot.branch
4360 || statuses_by_path != prev_snapshot.statuses_by_path
4361 {
4362 events.push(RepositoryEvent::Updated { full_scan: true });
4363 }
4364
4365 let mut current_merge_conflicts = TreeSet::default();
4366 for (repo_path, status) in statuses.entries.iter() {
4367 if status.is_conflicted() {
4368 current_merge_conflicts.insert(repo_path.clone());
4369 }
4370 }
4371
4372 // Cache merge conflict paths so they don't change from staging/unstaging,
4373 // until the merge heads change (at commit time, etc.).
4374 let mut merge_conflicts = prev_snapshot.merge_conflicts.clone();
4375 if merge_head_shas_changed {
4376 merge_conflicts = current_merge_conflicts;
4377 events.push(RepositoryEvent::MergeHeadsChanged);
4378 }
4379
4380 let snapshot = RepositorySnapshot {
4381 id,
4382 merge_message,
4383 statuses_by_path,
4384 work_directory_abs_path,
4385 scan_id: prev_snapshot.scan_id + 1,
4386 branch,
4387 merge_conflicts,
4388 merge_head_shas,
4389 };
4390
4391 Ok((snapshot, events))
4392}
4393
4394fn status_from_proto(
4395 simple_status: i32,
4396 status: Option<proto::GitFileStatus>,
4397) -> anyhow::Result<FileStatus> {
4398 use proto::git_file_status::Variant;
4399
4400 let Some(variant) = status.and_then(|status| status.variant) else {
4401 let code = proto::GitStatus::from_i32(simple_status)
4402 .ok_or_else(|| anyhow!("Invalid git status code: {simple_status}"))?;
4403 let result = match code {
4404 proto::GitStatus::Added => TrackedStatus {
4405 worktree_status: StatusCode::Added,
4406 index_status: StatusCode::Unmodified,
4407 }
4408 .into(),
4409 proto::GitStatus::Modified => TrackedStatus {
4410 worktree_status: StatusCode::Modified,
4411 index_status: StatusCode::Unmodified,
4412 }
4413 .into(),
4414 proto::GitStatus::Conflict => UnmergedStatus {
4415 first_head: UnmergedStatusCode::Updated,
4416 second_head: UnmergedStatusCode::Updated,
4417 }
4418 .into(),
4419 proto::GitStatus::Deleted => TrackedStatus {
4420 worktree_status: StatusCode::Deleted,
4421 index_status: StatusCode::Unmodified,
4422 }
4423 .into(),
4424 _ => return Err(anyhow!("Invalid code for simple status: {simple_status}")),
4425 };
4426 return Ok(result);
4427 };
4428
4429 let result = match variant {
4430 Variant::Untracked(_) => FileStatus::Untracked,
4431 Variant::Ignored(_) => FileStatus::Ignored,
4432 Variant::Unmerged(unmerged) => {
4433 let [first_head, second_head] =
4434 [unmerged.first_head, unmerged.second_head].map(|head| {
4435 let code = proto::GitStatus::from_i32(head)
4436 .ok_or_else(|| anyhow!("Invalid git status code: {head}"))?;
4437 let result = match code {
4438 proto::GitStatus::Added => UnmergedStatusCode::Added,
4439 proto::GitStatus::Updated => UnmergedStatusCode::Updated,
4440 proto::GitStatus::Deleted => UnmergedStatusCode::Deleted,
4441 _ => return Err(anyhow!("Invalid code for unmerged status: {code:?}")),
4442 };
4443 Ok(result)
4444 });
4445 let [first_head, second_head] = [first_head?, second_head?];
4446 UnmergedStatus {
4447 first_head,
4448 second_head,
4449 }
4450 .into()
4451 }
4452 Variant::Tracked(tracked) => {
4453 let [index_status, worktree_status] = [tracked.index_status, tracked.worktree_status]
4454 .map(|status| {
4455 let code = proto::GitStatus::from_i32(status)
4456 .ok_or_else(|| anyhow!("Invalid git status code: {status}"))?;
4457 let result = match code {
4458 proto::GitStatus::Modified => StatusCode::Modified,
4459 proto::GitStatus::TypeChanged => StatusCode::TypeChanged,
4460 proto::GitStatus::Added => StatusCode::Added,
4461 proto::GitStatus::Deleted => StatusCode::Deleted,
4462 proto::GitStatus::Renamed => StatusCode::Renamed,
4463 proto::GitStatus::Copied => StatusCode::Copied,
4464 proto::GitStatus::Unmodified => StatusCode::Unmodified,
4465 _ => return Err(anyhow!("Invalid code for tracked status: {code:?}")),
4466 };
4467 Ok(result)
4468 });
4469 let [index_status, worktree_status] = [index_status?, worktree_status?];
4470 TrackedStatus {
4471 index_status,
4472 worktree_status,
4473 }
4474 .into()
4475 }
4476 };
4477 Ok(result)
4478}
4479
4480fn status_to_proto(status: FileStatus) -> proto::GitFileStatus {
4481 use proto::git_file_status::{Tracked, Unmerged, Variant};
4482
4483 let variant = match status {
4484 FileStatus::Untracked => Variant::Untracked(Default::default()),
4485 FileStatus::Ignored => Variant::Ignored(Default::default()),
4486 FileStatus::Unmerged(UnmergedStatus {
4487 first_head,
4488 second_head,
4489 }) => Variant::Unmerged(Unmerged {
4490 first_head: unmerged_status_to_proto(first_head),
4491 second_head: unmerged_status_to_proto(second_head),
4492 }),
4493 FileStatus::Tracked(TrackedStatus {
4494 index_status,
4495 worktree_status,
4496 }) => Variant::Tracked(Tracked {
4497 index_status: tracked_status_to_proto(index_status),
4498 worktree_status: tracked_status_to_proto(worktree_status),
4499 }),
4500 };
4501 proto::GitFileStatus {
4502 variant: Some(variant),
4503 }
4504}
4505
4506fn unmerged_status_to_proto(code: UnmergedStatusCode) -> i32 {
4507 match code {
4508 UnmergedStatusCode::Added => proto::GitStatus::Added as _,
4509 UnmergedStatusCode::Deleted => proto::GitStatus::Deleted as _,
4510 UnmergedStatusCode::Updated => proto::GitStatus::Updated as _,
4511 }
4512}
4513
4514fn tracked_status_to_proto(code: StatusCode) -> i32 {
4515 match code {
4516 StatusCode::Added => proto::GitStatus::Added as _,
4517 StatusCode::Deleted => proto::GitStatus::Deleted as _,
4518 StatusCode::Modified => proto::GitStatus::Modified as _,
4519 StatusCode::Renamed => proto::GitStatus::Renamed as _,
4520 StatusCode::TypeChanged => proto::GitStatus::TypeChanged as _,
4521 StatusCode::Copied => proto::GitStatus::Copied as _,
4522 StatusCode::Unmodified => proto::GitStatus::Unmodified as _,
4523 }
4524}