1pub mod git_traversal;
2
3use crate::{
4 ProjectEnvironment, ProjectItem, ProjectPath,
5 buffer_store::{BufferStore, BufferStoreEvent},
6 worktree_store::{WorktreeStore, WorktreeStoreEvent},
7};
8use anyhow::{Context as _, Result, anyhow, bail};
9use askpass::AskPassDelegate;
10use buffer_diff::{BufferDiff, BufferDiffEvent};
11use client::ProjectId;
12use collections::HashMap;
13use fs::Fs;
14use futures::{
15 FutureExt as _, StreamExt as _,
16 channel::{mpsc, oneshot},
17 future::{self, Shared},
18};
19use git::{
20 BuildPermalinkParams, GitHostingProviderRegistry, WORK_DIRECTORY_REPO_PATH,
21 blame::Blame,
22 parse_git_remote_url,
23 repository::{
24 Branch, CommitDetails, CommitDiff, CommitFile, DiffType, GitRepository,
25 GitRepositoryCheckpoint, PushOptions, Remote, RemoteCommandOutput, RepoPath, ResetMode,
26 UpstreamTrackingStatus,
27 },
28 status::{
29 FileStatus, GitSummary, StatusCode, TrackedStatus, UnmergedStatus, UnmergedStatusCode,
30 },
31};
32use gpui::{
33 App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
34 WeakEntity,
35};
36use language::{
37 Buffer, BufferEvent, Language, LanguageRegistry,
38 proto::{deserialize_version, serialize_version},
39};
40use parking_lot::Mutex;
41use postage::stream::Stream as _;
42use rpc::{
43 AnyProtoClient, TypedEnvelope,
44 proto::{self, FromProto, SSH_PROJECT_ID, ToProto, git_reset, split_repository_update},
45};
46use serde::Deserialize;
47use std::{
48 cmp::Ordering,
49 collections::{BTreeSet, VecDeque},
50 future::Future,
51 mem,
52 ops::Range,
53 path::{Path, PathBuf},
54 sync::{
55 Arc,
56 atomic::{self, AtomicU64},
57 },
58 time::Instant,
59};
60use sum_tree::{Edit, SumTree, TreeSet};
61use text::{Bias, BufferId};
62use util::{ResultExt, debug_panic, post_inc};
63use worktree::{
64 File, PathKey, PathProgress, PathSummary, PathTarget, UpdatedGitRepositoriesSet,
65 UpdatedGitRepository, Worktree,
66};
67
68pub struct GitStore {
69 state: GitStoreState,
70 buffer_store: Entity<BufferStore>,
71 worktree_store: Entity<WorktreeStore>,
72 repositories: HashMap<RepositoryId, Entity<Repository>>,
73 active_repo_id: Option<RepositoryId>,
74 #[allow(clippy::type_complexity)]
75 loading_diffs:
76 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
77 diffs: HashMap<BufferId, Entity<BufferDiffState>>,
78 shared_diffs: HashMap<proto::PeerId, HashMap<BufferId, SharedDiffs>>,
79 _subscriptions: Vec<Subscription>,
80}
81
82#[derive(Default)]
83struct SharedDiffs {
84 unstaged: Option<Entity<BufferDiff>>,
85 uncommitted: Option<Entity<BufferDiff>>,
86}
87
88struct BufferDiffState {
89 unstaged_diff: Option<WeakEntity<BufferDiff>>,
90 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
91 recalculate_diff_task: Option<Task<Result<()>>>,
92 language: Option<Arc<Language>>,
93 language_registry: Option<Arc<LanguageRegistry>>,
94 recalculating_tx: postage::watch::Sender<bool>,
95
96 /// These operation counts are used to ensure that head and index text
97 /// values read from the git repository are up-to-date with any hunk staging
98 /// operations that have been performed on the BufferDiff.
99 ///
100 /// The operation count is incremented immediately when the user initiates a
101 /// hunk stage/unstage operation. Then, upon finishing writing the new index
102 /// text do disk, the `operation count as of write` is updated to reflect
103 /// the operation count that prompted the write.
104 hunk_staging_operation_count: usize,
105 hunk_staging_operation_count_as_of_write: usize,
106
107 head_text: Option<Arc<String>>,
108 index_text: Option<Arc<String>>,
109 head_changed: bool,
110 index_changed: bool,
111 language_changed: bool,
112}
113
114#[derive(Clone, Debug)]
115enum DiffBasesChange {
116 SetIndex(Option<String>),
117 SetHead(Option<String>),
118 SetEach {
119 index: Option<String>,
120 head: Option<String>,
121 },
122 SetBoth(Option<String>),
123}
124
125#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
126enum DiffKind {
127 Unstaged,
128 Uncommitted,
129}
130
131enum GitStoreState {
132 Local {
133 next_repository_id: Arc<AtomicU64>,
134 downstream: Option<LocalDownstreamState>,
135 project_environment: Entity<ProjectEnvironment>,
136 fs: Arc<dyn Fs>,
137 },
138 Ssh {
139 upstream_client: AnyProtoClient,
140 upstream_project_id: ProjectId,
141 downstream: Option<(AnyProtoClient, ProjectId)>,
142 },
143 Remote {
144 upstream_client: AnyProtoClient,
145 upstream_project_id: ProjectId,
146 },
147}
148
149enum DownstreamUpdate {
150 UpdateRepository(RepositorySnapshot),
151 RemoveRepository(RepositoryId),
152}
153
154struct LocalDownstreamState {
155 client: AnyProtoClient,
156 project_id: ProjectId,
157 updates_tx: mpsc::UnboundedSender<DownstreamUpdate>,
158 _task: Task<Result<()>>,
159}
160
161#[derive(Clone)]
162pub struct GitStoreCheckpoint {
163 checkpoints_by_work_dir_abs_path: HashMap<Arc<Path>, GitRepositoryCheckpoint>,
164}
165
166#[derive(Clone, Debug, PartialEq, Eq)]
167pub struct StatusEntry {
168 pub repo_path: RepoPath,
169 pub status: FileStatus,
170}
171
172impl StatusEntry {
173 fn to_proto(&self) -> proto::StatusEntry {
174 let simple_status = match self.status {
175 FileStatus::Ignored | FileStatus::Untracked => proto::GitStatus::Added as i32,
176 FileStatus::Unmerged { .. } => proto::GitStatus::Conflict as i32,
177 FileStatus::Tracked(TrackedStatus {
178 index_status,
179 worktree_status,
180 }) => tracked_status_to_proto(if worktree_status != StatusCode::Unmodified {
181 worktree_status
182 } else {
183 index_status
184 }),
185 };
186
187 proto::StatusEntry {
188 repo_path: self.repo_path.as_ref().to_proto(),
189 simple_status,
190 status: Some(status_to_proto(self.status)),
191 }
192 }
193}
194
195impl TryFrom<proto::StatusEntry> for StatusEntry {
196 type Error = anyhow::Error;
197
198 fn try_from(value: proto::StatusEntry) -> Result<Self, Self::Error> {
199 let repo_path = RepoPath(Arc::<Path>::from_proto(value.repo_path));
200 let status = status_from_proto(value.simple_status, value.status)?;
201 Ok(Self { repo_path, status })
202 }
203}
204
205impl sum_tree::Item for StatusEntry {
206 type Summary = PathSummary<GitSummary>;
207
208 fn summary(&self, _: &<Self::Summary as sum_tree::Summary>::Context) -> Self::Summary {
209 PathSummary {
210 max_path: self.repo_path.0.clone(),
211 item_summary: self.status.summary(),
212 }
213 }
214}
215
216impl sum_tree::KeyedItem for StatusEntry {
217 type Key = PathKey;
218
219 fn key(&self) -> Self::Key {
220 PathKey(self.repo_path.0.clone())
221 }
222}
223
224#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
225pub struct RepositoryId(pub u64);
226
227#[derive(Clone, Debug, PartialEq, Eq)]
228pub struct RepositorySnapshot {
229 pub id: RepositoryId,
230 pub merge_message: Option<SharedString>,
231 pub statuses_by_path: SumTree<StatusEntry>,
232 pub work_directory_abs_path: Arc<Path>,
233 pub branch: Option<Branch>,
234 pub merge_conflicts: TreeSet<RepoPath>,
235 pub merge_head_shas: Vec<SharedString>,
236 pub scan_id: u64,
237}
238
239type JobId = u64;
240
241#[derive(Clone, Debug, PartialEq, Eq)]
242pub struct JobInfo {
243 pub start: Instant,
244 pub message: SharedString,
245}
246
247pub struct Repository {
248 this: WeakEntity<Self>,
249 snapshot: RepositorySnapshot,
250 commit_message_buffer: Option<Entity<Buffer>>,
251 git_store: WeakEntity<GitStore>,
252 // For a local repository, holds paths that have had worktree events since the last status scan completed,
253 // and that should be examined during the next status scan.
254 paths_needing_status_update: BTreeSet<RepoPath>,
255 job_sender: mpsc::UnboundedSender<GitJob>,
256 active_jobs: HashMap<JobId, JobInfo>,
257 job_id: JobId,
258 askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
259 latest_askpass_id: u64,
260}
261
262impl std::ops::Deref for Repository {
263 type Target = RepositorySnapshot;
264
265 fn deref(&self) -> &Self::Target {
266 &self.snapshot
267 }
268}
269
270#[derive(Clone)]
271pub enum RepositoryState {
272 Local {
273 backend: Arc<dyn GitRepository>,
274 environment: Arc<HashMap<String, String>>,
275 },
276 Remote {
277 project_id: ProjectId,
278 client: AnyProtoClient,
279 },
280}
281
282#[derive(Clone, Debug)]
283pub enum RepositoryEvent {
284 Updated { full_scan: bool },
285 MergeHeadsChanged,
286}
287
288#[derive(Clone, Debug)]
289pub struct JobsUpdated;
290
291#[derive(Debug)]
292pub enum GitStoreEvent {
293 ActiveRepositoryChanged(Option<RepositoryId>),
294 RepositoryUpdated(RepositoryId, RepositoryEvent, bool),
295 RepositoryAdded(RepositoryId),
296 RepositoryRemoved(RepositoryId),
297 IndexWriteError(anyhow::Error),
298 JobsUpdated,
299}
300
301impl EventEmitter<RepositoryEvent> for Repository {}
302impl EventEmitter<JobsUpdated> for Repository {}
303impl EventEmitter<GitStoreEvent> for GitStore {}
304
305pub struct GitJob {
306 job: Box<dyn FnOnce(RepositoryState, &mut AsyncApp) -> Task<()>>,
307 key: Option<GitJobKey>,
308}
309
310#[derive(PartialEq, Eq)]
311enum GitJobKey {
312 WriteIndex(RepoPath),
313 ReloadBufferDiffBases,
314 RefreshStatuses,
315 ReloadGitState,
316}
317
318impl GitStore {
319 pub fn local(
320 worktree_store: &Entity<WorktreeStore>,
321 buffer_store: Entity<BufferStore>,
322 environment: Entity<ProjectEnvironment>,
323 fs: Arc<dyn Fs>,
324 cx: &mut Context<Self>,
325 ) -> Self {
326 Self::new(
327 worktree_store.clone(),
328 buffer_store,
329 GitStoreState::Local {
330 next_repository_id: Arc::new(AtomicU64::new(1)),
331 downstream: None,
332 project_environment: environment,
333 fs,
334 },
335 cx,
336 )
337 }
338
339 pub fn remote(
340 worktree_store: &Entity<WorktreeStore>,
341 buffer_store: Entity<BufferStore>,
342 upstream_client: AnyProtoClient,
343 project_id: ProjectId,
344 cx: &mut Context<Self>,
345 ) -> Self {
346 Self::new(
347 worktree_store.clone(),
348 buffer_store,
349 GitStoreState::Remote {
350 upstream_client,
351 upstream_project_id: project_id,
352 },
353 cx,
354 )
355 }
356
357 pub fn ssh(
358 worktree_store: &Entity<WorktreeStore>,
359 buffer_store: Entity<BufferStore>,
360 upstream_client: AnyProtoClient,
361 cx: &mut Context<Self>,
362 ) -> Self {
363 Self::new(
364 worktree_store.clone(),
365 buffer_store,
366 GitStoreState::Ssh {
367 upstream_client,
368 upstream_project_id: ProjectId(SSH_PROJECT_ID),
369 downstream: None,
370 },
371 cx,
372 )
373 }
374
375 fn new(
376 worktree_store: Entity<WorktreeStore>,
377 buffer_store: Entity<BufferStore>,
378 state: GitStoreState,
379 cx: &mut Context<Self>,
380 ) -> Self {
381 let _subscriptions = vec![
382 cx.subscribe(&worktree_store, Self::on_worktree_store_event),
383 cx.subscribe(&buffer_store, Self::on_buffer_store_event),
384 ];
385
386 GitStore {
387 state,
388 buffer_store,
389 worktree_store,
390 repositories: HashMap::default(),
391 active_repo_id: None,
392 _subscriptions,
393 loading_diffs: HashMap::default(),
394 shared_diffs: HashMap::default(),
395 diffs: HashMap::default(),
396 }
397 }
398
399 pub fn init(client: &AnyProtoClient) {
400 client.add_entity_request_handler(Self::handle_get_remotes);
401 client.add_entity_request_handler(Self::handle_get_branches);
402 client.add_entity_request_handler(Self::handle_change_branch);
403 client.add_entity_request_handler(Self::handle_create_branch);
404 client.add_entity_request_handler(Self::handle_git_init);
405 client.add_entity_request_handler(Self::handle_push);
406 client.add_entity_request_handler(Self::handle_pull);
407 client.add_entity_request_handler(Self::handle_fetch);
408 client.add_entity_request_handler(Self::handle_stage);
409 client.add_entity_request_handler(Self::handle_unstage);
410 client.add_entity_request_handler(Self::handle_commit);
411 client.add_entity_request_handler(Self::handle_reset);
412 client.add_entity_request_handler(Self::handle_show);
413 client.add_entity_request_handler(Self::handle_load_commit_diff);
414 client.add_entity_request_handler(Self::handle_checkout_files);
415 client.add_entity_request_handler(Self::handle_open_commit_message_buffer);
416 client.add_entity_request_handler(Self::handle_set_index_text);
417 client.add_entity_request_handler(Self::handle_askpass);
418 client.add_entity_request_handler(Self::handle_check_for_pushed_commits);
419 client.add_entity_request_handler(Self::handle_git_diff);
420 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
421 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
422 client.add_entity_message_handler(Self::handle_update_diff_bases);
423 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
424 client.add_entity_request_handler(Self::handle_blame_buffer);
425 client.add_entity_message_handler(Self::handle_update_repository);
426 client.add_entity_message_handler(Self::handle_remove_repository);
427 }
428
429 pub fn is_local(&self) -> bool {
430 matches!(self.state, GitStoreState::Local { .. })
431 }
432
433 pub fn shared(&mut self, project_id: u64, client: AnyProtoClient, cx: &mut Context<Self>) {
434 match &mut self.state {
435 GitStoreState::Ssh {
436 downstream: downstream_client,
437 ..
438 } => {
439 for repo in self.repositories.values() {
440 let update = repo.read(cx).snapshot.initial_update(project_id);
441 for update in split_repository_update(update) {
442 client.send(update).log_err();
443 }
444 }
445 *downstream_client = Some((client, ProjectId(project_id)));
446 }
447 GitStoreState::Local {
448 downstream: downstream_client,
449 ..
450 } => {
451 let mut snapshots = HashMap::default();
452 let (updates_tx, mut updates_rx) = mpsc::unbounded();
453 for repo in self.repositories.values() {
454 updates_tx
455 .unbounded_send(DownstreamUpdate::UpdateRepository(
456 repo.read(cx).snapshot.clone(),
457 ))
458 .ok();
459 }
460 *downstream_client = Some(LocalDownstreamState {
461 client: client.clone(),
462 project_id: ProjectId(project_id),
463 updates_tx,
464 _task: cx.spawn(async move |this, cx| {
465 cx.background_spawn(async move {
466 while let Some(update) = updates_rx.next().await {
467 match update {
468 DownstreamUpdate::UpdateRepository(snapshot) => {
469 if let Some(old_snapshot) = snapshots.get_mut(&snapshot.id)
470 {
471 let update =
472 snapshot.build_update(old_snapshot, project_id);
473 *old_snapshot = snapshot;
474 for update in split_repository_update(update) {
475 client.send(update)?;
476 }
477 } else {
478 let update = snapshot.initial_update(project_id);
479 for update in split_repository_update(update) {
480 client.send(update)?;
481 }
482 snapshots.insert(snapshot.id, snapshot);
483 }
484 }
485 DownstreamUpdate::RemoveRepository(id) => {
486 client.send(proto::RemoveRepository {
487 project_id,
488 id: id.to_proto(),
489 })?;
490 }
491 }
492 }
493 anyhow::Ok(())
494 })
495 .await
496 .ok();
497 this.update(cx, |this, _| {
498 if let GitStoreState::Local {
499 downstream: downstream_client,
500 ..
501 } = &mut this.state
502 {
503 downstream_client.take();
504 } else {
505 unreachable!("unshared called on remote store");
506 }
507 })
508 }),
509 });
510 }
511 GitStoreState::Remote { .. } => {
512 debug_panic!("shared called on remote store");
513 }
514 }
515 }
516
517 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
518 match &mut self.state {
519 GitStoreState::Local {
520 downstream: downstream_client,
521 ..
522 } => {
523 downstream_client.take();
524 }
525 GitStoreState::Ssh {
526 downstream: downstream_client,
527 ..
528 } => {
529 downstream_client.take();
530 }
531 GitStoreState::Remote { .. } => {
532 debug_panic!("unshared called on remote store");
533 }
534 }
535 self.shared_diffs.clear();
536 }
537
538 pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) {
539 self.shared_diffs.remove(peer_id);
540 }
541
542 pub fn active_repository(&self) -> Option<Entity<Repository>> {
543 self.active_repo_id
544 .as_ref()
545 .map(|id| self.repositories[&id].clone())
546 }
547
548 pub fn open_unstaged_diff(
549 &mut self,
550 buffer: Entity<Buffer>,
551 cx: &mut Context<Self>,
552 ) -> Task<Result<Entity<BufferDiff>>> {
553 let buffer_id = buffer.read(cx).remote_id();
554 if let Some(diff_state) = self.diffs.get(&buffer_id) {
555 if let Some(unstaged_diff) = diff_state
556 .read(cx)
557 .unstaged_diff
558 .as_ref()
559 .and_then(|weak| weak.upgrade())
560 {
561 if let Some(task) =
562 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
563 {
564 return cx.background_executor().spawn(async move {
565 task.await;
566 Ok(unstaged_diff)
567 });
568 }
569 return Task::ready(Ok(unstaged_diff));
570 }
571 }
572
573 let Some((repo, repo_path)) =
574 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
575 else {
576 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
577 };
578
579 let task = self
580 .loading_diffs
581 .entry((buffer_id, DiffKind::Unstaged))
582 .or_insert_with(|| {
583 let staged_text = repo.update(cx, |repo, cx| {
584 repo.load_staged_text(buffer_id, repo_path, cx)
585 });
586 cx.spawn(async move |this, cx| {
587 Self::open_diff_internal(
588 this,
589 DiffKind::Unstaged,
590 staged_text.await.map(DiffBasesChange::SetIndex),
591 buffer,
592 cx,
593 )
594 .await
595 .map_err(Arc::new)
596 })
597 .shared()
598 })
599 .clone();
600
601 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
602 }
603
604 pub fn open_uncommitted_diff(
605 &mut self,
606 buffer: Entity<Buffer>,
607 cx: &mut Context<Self>,
608 ) -> Task<Result<Entity<BufferDiff>>> {
609 let buffer_id = buffer.read(cx).remote_id();
610
611 if let Some(diff_state) = self.diffs.get(&buffer_id) {
612 if let Some(uncommitted_diff) = diff_state
613 .read(cx)
614 .uncommitted_diff
615 .as_ref()
616 .and_then(|weak| weak.upgrade())
617 {
618 if let Some(task) =
619 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
620 {
621 return cx.background_executor().spawn(async move {
622 task.await;
623 Ok(uncommitted_diff)
624 });
625 }
626 return Task::ready(Ok(uncommitted_diff));
627 }
628 }
629
630 let Some((repo, repo_path)) =
631 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
632 else {
633 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
634 };
635
636 let task = self
637 .loading_diffs
638 .entry((buffer_id, DiffKind::Uncommitted))
639 .or_insert_with(|| {
640 let changes = repo.update(cx, |repo, cx| {
641 repo.load_committed_text(buffer_id, repo_path, cx)
642 });
643
644 cx.spawn(async move |this, cx| {
645 Self::open_diff_internal(this, DiffKind::Uncommitted, changes.await, buffer, cx)
646 .await
647 .map_err(Arc::new)
648 })
649 .shared()
650 })
651 .clone();
652
653 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
654 }
655
656 async fn open_diff_internal(
657 this: WeakEntity<Self>,
658 kind: DiffKind,
659 texts: Result<DiffBasesChange>,
660 buffer_entity: Entity<Buffer>,
661 cx: &mut AsyncApp,
662 ) -> Result<Entity<BufferDiff>> {
663 let diff_bases_change = match texts {
664 Err(e) => {
665 this.update(cx, |this, cx| {
666 let buffer = buffer_entity.read(cx);
667 let buffer_id = buffer.remote_id();
668 this.loading_diffs.remove(&(buffer_id, kind));
669 })?;
670 return Err(e);
671 }
672 Ok(change) => change,
673 };
674
675 this.update(cx, |this, cx| {
676 let buffer = buffer_entity.read(cx);
677 let buffer_id = buffer.remote_id();
678 let language = buffer.language().cloned();
679 let language_registry = buffer.language_registry();
680 let text_snapshot = buffer.text_snapshot();
681 this.loading_diffs.remove(&(buffer_id, kind));
682
683 let diff_state = this
684 .diffs
685 .entry(buffer_id)
686 .or_insert_with(|| cx.new(|_| BufferDiffState::default()));
687
688 let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
689
690 cx.subscribe(&diff, Self::on_buffer_diff_event).detach();
691 diff_state.update(cx, |diff_state, cx| {
692 diff_state.language = language;
693 diff_state.language_registry = language_registry;
694
695 match kind {
696 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
697 DiffKind::Uncommitted => {
698 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
699 diff
700 } else {
701 let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
702 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
703 unstaged_diff
704 };
705
706 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
707 diff_state.uncommitted_diff = Some(diff.downgrade())
708 }
709 }
710
711 diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
712 let rx = diff_state.wait_for_recalculation();
713
714 anyhow::Ok(async move {
715 if let Some(rx) = rx {
716 rx.await;
717 }
718 Ok(diff)
719 })
720 })
721 })??
722 .await
723 }
724
725 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
726 let diff_state = self.diffs.get(&buffer_id)?;
727 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
728 }
729
730 pub fn get_uncommitted_diff(
731 &self,
732 buffer_id: BufferId,
733 cx: &App,
734 ) -> Option<Entity<BufferDiff>> {
735 let diff_state = self.diffs.get(&buffer_id)?;
736 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
737 }
738
739 pub fn project_path_git_status(
740 &self,
741 project_path: &ProjectPath,
742 cx: &App,
743 ) -> Option<FileStatus> {
744 let (repo, repo_path) = self.repository_and_path_for_project_path(project_path, cx)?;
745 Some(repo.read(cx).status_for_path(&repo_path)?.status)
746 }
747
748 pub fn checkpoint(&self, cx: &mut App) -> Task<Result<GitStoreCheckpoint>> {
749 let mut work_directory_abs_paths = Vec::new();
750 let mut checkpoints = Vec::new();
751 for repository in self.repositories.values() {
752 repository.update(cx, |repository, _| {
753 work_directory_abs_paths.push(repository.snapshot.work_directory_abs_path.clone());
754 checkpoints.push(repository.checkpoint().map(|checkpoint| checkpoint?));
755 });
756 }
757
758 cx.background_executor().spawn(async move {
759 let checkpoints = future::try_join_all(checkpoints).await?;
760 Ok(GitStoreCheckpoint {
761 checkpoints_by_work_dir_abs_path: work_directory_abs_paths
762 .into_iter()
763 .zip(checkpoints)
764 .collect(),
765 })
766 })
767 }
768
769 pub fn restore_checkpoint(
770 &self,
771 checkpoint: GitStoreCheckpoint,
772 cx: &mut App,
773 ) -> Task<Result<()>> {
774 let repositories_by_work_dir_abs_path = self
775 .repositories
776 .values()
777 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
778 .collect::<HashMap<_, _>>();
779
780 let mut tasks = Vec::new();
781 for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
782 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path) {
783 let restore = repository.update(cx, |repository, _| {
784 repository.restore_checkpoint(checkpoint)
785 });
786 tasks.push(async move { restore.await? });
787 }
788 }
789 cx.background_spawn(async move {
790 future::try_join_all(tasks).await?;
791 Ok(())
792 })
793 }
794
795 /// Compares two checkpoints, returning true if they are equal.
796 pub fn compare_checkpoints(
797 &self,
798 left: GitStoreCheckpoint,
799 mut right: GitStoreCheckpoint,
800 cx: &mut App,
801 ) -> Task<Result<bool>> {
802 let repositories_by_work_dir_abs_path = self
803 .repositories
804 .values()
805 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
806 .collect::<HashMap<_, _>>();
807
808 let mut tasks = Vec::new();
809 for (work_dir_abs_path, left_checkpoint) in left.checkpoints_by_work_dir_abs_path {
810 if let Some(right_checkpoint) = right
811 .checkpoints_by_work_dir_abs_path
812 .remove(&work_dir_abs_path)
813 {
814 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path)
815 {
816 let compare = repository.update(cx, |repository, _| {
817 repository.compare_checkpoints(left_checkpoint, right_checkpoint)
818 });
819
820 tasks.push(async move { compare.await? });
821 }
822 } else {
823 return Task::ready(Ok(false));
824 }
825 }
826 cx.background_spawn(async move {
827 Ok(future::try_join_all(tasks)
828 .await?
829 .into_iter()
830 .all(|result| result))
831 })
832 }
833
834 pub fn delete_checkpoint(
835 &self,
836 checkpoint: GitStoreCheckpoint,
837 cx: &mut App,
838 ) -> Task<Result<()>> {
839 let repositories_by_work_directory_abs_path = self
840 .repositories
841 .values()
842 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
843 .collect::<HashMap<_, _>>();
844
845 let mut tasks = Vec::new();
846 for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
847 if let Some(repository) =
848 repositories_by_work_directory_abs_path.get(&work_dir_abs_path)
849 {
850 let delete = repository.update(cx, |this, _| this.delete_checkpoint(checkpoint));
851 tasks.push(async move { delete.await? });
852 }
853 }
854 cx.background_spawn(async move {
855 future::try_join_all(tasks).await?;
856 Ok(())
857 })
858 }
859
860 /// Blames a buffer.
861 pub fn blame_buffer(
862 &self,
863 buffer: &Entity<Buffer>,
864 version: Option<clock::Global>,
865 cx: &mut App,
866 ) -> Task<Result<Option<Blame>>> {
867 let buffer = buffer.read(cx);
868 let Some((repo, repo_path)) =
869 self.repository_and_path_for_buffer_id(buffer.remote_id(), cx)
870 else {
871 return Task::ready(Err(anyhow!("failed to find a git repository for buffer")));
872 };
873 let content = match &version {
874 Some(version) => buffer.rope_for_version(version).clone(),
875 None => buffer.as_rope().clone(),
876 };
877 let version = version.unwrap_or(buffer.version());
878 let buffer_id = buffer.remote_id();
879
880 let rx = repo.update(cx, |repo, _| {
881 repo.send_job(None, move |state, _| async move {
882 match state {
883 RepositoryState::Local { backend, .. } => backend
884 .blame(repo_path.clone(), content)
885 .await
886 .with_context(|| format!("Failed to blame {:?}", repo_path.0))
887 .map(Some),
888 RepositoryState::Remote { project_id, client } => {
889 let response = client
890 .request(proto::BlameBuffer {
891 project_id: project_id.to_proto(),
892 buffer_id: buffer_id.into(),
893 version: serialize_version(&version),
894 })
895 .await?;
896 Ok(deserialize_blame_buffer_response(response))
897 }
898 }
899 })
900 });
901
902 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
903 }
904
905 pub fn get_permalink_to_line(
906 &self,
907 buffer: &Entity<Buffer>,
908 selection: Range<u32>,
909 cx: &mut App,
910 ) -> Task<Result<url::Url>> {
911 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
912 return Task::ready(Err(anyhow!("buffer has no file")));
913 };
914
915 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
916 &(file.worktree.read(cx).id(), file.path.clone()).into(),
917 cx,
918 ) else {
919 // If we're not in a Git repo, check whether this is a Rust source
920 // file in the Cargo registry (presumably opened with go-to-definition
921 // from a normal Rust file). If so, we can put together a permalink
922 // using crate metadata.
923 if buffer
924 .read(cx)
925 .language()
926 .is_none_or(|lang| lang.name() != "Rust".into())
927 {
928 return Task::ready(Err(anyhow!("no permalink available")));
929 }
930 let Some(file_path) = file.worktree.read(cx).absolutize(&file.path).ok() else {
931 return Task::ready(Err(anyhow!("no permalink available")));
932 };
933 return cx.spawn(async move |cx| {
934 let provider_registry = cx.update(GitHostingProviderRegistry::default_global)?;
935 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
936 .map_err(|_| anyhow!("no permalink available"))
937 });
938
939 // TODO remote case
940 };
941
942 let buffer_id = buffer.read(cx).remote_id();
943 let branch = repo.read(cx).branch.clone();
944 let remote = branch
945 .as_ref()
946 .and_then(|b| b.upstream.as_ref())
947 .and_then(|b| b.remote_name())
948 .unwrap_or("origin")
949 .to_string();
950
951 let rx = repo.update(cx, |repo, _| {
952 repo.send_job(None, move |state, cx| async move {
953 match state {
954 RepositoryState::Local { backend, .. } => {
955 let origin_url = backend
956 .remote_url(&remote)
957 .ok_or_else(|| anyhow!("remote \"{remote}\" not found"))?;
958
959 let sha = backend
960 .head_sha()
961 .ok_or_else(|| anyhow!("failed to read HEAD SHA"))?;
962
963 let provider_registry =
964 cx.update(GitHostingProviderRegistry::default_global)?;
965
966 let (provider, remote) =
967 parse_git_remote_url(provider_registry, &origin_url)
968 .ok_or_else(|| anyhow!("failed to parse Git remote URL"))?;
969
970 let path = repo_path
971 .to_str()
972 .ok_or_else(|| anyhow!("failed to convert path to string"))?;
973
974 Ok(provider.build_permalink(
975 remote,
976 BuildPermalinkParams {
977 sha: &sha,
978 path,
979 selection: Some(selection),
980 },
981 ))
982 }
983 RepositoryState::Remote { project_id, client } => {
984 let response = client
985 .request(proto::GetPermalinkToLine {
986 project_id: project_id.to_proto(),
987 buffer_id: buffer_id.into(),
988 selection: Some(proto::Range {
989 start: selection.start as u64,
990 end: selection.end as u64,
991 }),
992 })
993 .await?;
994
995 url::Url::parse(&response.permalink).context("failed to parse permalink")
996 }
997 }
998 })
999 });
1000 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1001 }
1002
1003 fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> {
1004 match &self.state {
1005 GitStoreState::Local {
1006 downstream: downstream_client,
1007 ..
1008 } => downstream_client
1009 .as_ref()
1010 .map(|state| (state.client.clone(), state.project_id)),
1011 GitStoreState::Ssh {
1012 downstream: downstream_client,
1013 ..
1014 } => downstream_client.clone(),
1015 GitStoreState::Remote { .. } => None,
1016 }
1017 }
1018
1019 fn upstream_client(&self) -> Option<AnyProtoClient> {
1020 match &self.state {
1021 GitStoreState::Local { .. } => None,
1022 GitStoreState::Ssh {
1023 upstream_client, ..
1024 }
1025 | GitStoreState::Remote {
1026 upstream_client, ..
1027 } => Some(upstream_client.clone()),
1028 }
1029 }
1030
1031 fn on_worktree_store_event(
1032 &mut self,
1033 worktree_store: Entity<WorktreeStore>,
1034 event: &WorktreeStoreEvent,
1035 cx: &mut Context<Self>,
1036 ) {
1037 let GitStoreState::Local {
1038 project_environment,
1039 downstream,
1040 next_repository_id,
1041 fs,
1042 } = &self.state
1043 else {
1044 return;
1045 };
1046
1047 match event {
1048 WorktreeStoreEvent::WorktreeUpdatedEntries(worktree_id, updated_entries) => {
1049 let mut paths_by_git_repo = HashMap::<_, Vec<_>>::default();
1050 for (relative_path, _, _) in updated_entries.iter() {
1051 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
1052 &(*worktree_id, relative_path.clone()).into(),
1053 cx,
1054 ) else {
1055 continue;
1056 };
1057 paths_by_git_repo.entry(repo).or_default().push(repo_path)
1058 }
1059
1060 for (repo, paths) in paths_by_git_repo {
1061 repo.update(cx, |repo, cx| {
1062 repo.paths_changed(
1063 paths,
1064 downstream
1065 .as_ref()
1066 .map(|downstream| downstream.updates_tx.clone()),
1067 cx,
1068 );
1069 });
1070 }
1071 }
1072 WorktreeStoreEvent::WorktreeUpdatedGitRepositories(worktree_id, changed_repos) => {
1073 let Some(worktree) = worktree_store.read(cx).worktree_for_id(*worktree_id, cx)
1074 else {
1075 return;
1076 };
1077 if !worktree.read(cx).is_visible() {
1078 log::debug!(
1079 "not adding repositories for local worktree {:?} because it's not visible",
1080 worktree.read(cx).abs_path()
1081 );
1082 return;
1083 }
1084 self.update_repositories_from_worktree(
1085 project_environment.clone(),
1086 next_repository_id.clone(),
1087 downstream
1088 .as_ref()
1089 .map(|downstream| downstream.updates_tx.clone()),
1090 changed_repos.clone(),
1091 fs.clone(),
1092 cx,
1093 );
1094 self.local_worktree_git_repos_changed(worktree, changed_repos, cx);
1095 }
1096 _ => {}
1097 }
1098 }
1099
1100 fn on_repository_event(
1101 &mut self,
1102 repo: Entity<Repository>,
1103 event: &RepositoryEvent,
1104 cx: &mut Context<Self>,
1105 ) {
1106 let id = repo.read(cx).id;
1107 cx.emit(GitStoreEvent::RepositoryUpdated(
1108 id,
1109 event.clone(),
1110 self.active_repo_id == Some(id),
1111 ))
1112 }
1113
1114 fn on_jobs_updated(&mut self, _: Entity<Repository>, _: &JobsUpdated, cx: &mut Context<Self>) {
1115 cx.emit(GitStoreEvent::JobsUpdated)
1116 }
1117
1118 /// Update our list of repositories and schedule git scans in response to a notification from a worktree,
1119 fn update_repositories_from_worktree(
1120 &mut self,
1121 project_environment: Entity<ProjectEnvironment>,
1122 next_repository_id: Arc<AtomicU64>,
1123 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
1124 updated_git_repositories: UpdatedGitRepositoriesSet,
1125 fs: Arc<dyn Fs>,
1126 cx: &mut Context<Self>,
1127 ) {
1128 let mut removed_ids = Vec::new();
1129 for update in updated_git_repositories.iter() {
1130 if let Some((id, existing)) = self.repositories.iter().find(|(_, repo)| {
1131 let existing_work_directory_abs_path =
1132 repo.read(cx).work_directory_abs_path.clone();
1133 Some(&existing_work_directory_abs_path)
1134 == update.old_work_directory_abs_path.as_ref()
1135 || Some(&existing_work_directory_abs_path)
1136 == update.new_work_directory_abs_path.as_ref()
1137 }) {
1138 if let Some(new_work_directory_abs_path) =
1139 update.new_work_directory_abs_path.clone()
1140 {
1141 existing.update(cx, |existing, cx| {
1142 existing.snapshot.work_directory_abs_path = new_work_directory_abs_path;
1143 existing.schedule_scan(updates_tx.clone(), cx);
1144 });
1145 } else {
1146 removed_ids.push(*id);
1147 }
1148 } else if let UpdatedGitRepository {
1149 new_work_directory_abs_path: Some(work_directory_abs_path),
1150 dot_git_abs_path: Some(dot_git_abs_path),
1151 repository_dir_abs_path: Some(repository_dir_abs_path),
1152 common_dir_abs_path: Some(common_dir_abs_path),
1153 ..
1154 } = update
1155 {
1156 let id = RepositoryId(next_repository_id.fetch_add(1, atomic::Ordering::Release));
1157 let git_store = cx.weak_entity();
1158 let repo = cx.new(|cx| {
1159 let mut repo = Repository::local(
1160 id,
1161 work_directory_abs_path.clone(),
1162 dot_git_abs_path.clone(),
1163 repository_dir_abs_path.clone(),
1164 common_dir_abs_path.clone(),
1165 project_environment.downgrade(),
1166 fs.clone(),
1167 git_store,
1168 cx,
1169 );
1170 repo.schedule_scan(updates_tx.clone(), cx);
1171 repo
1172 });
1173 self._subscriptions
1174 .push(cx.subscribe(&repo, Self::on_repository_event));
1175 self._subscriptions
1176 .push(cx.subscribe(&repo, Self::on_jobs_updated));
1177 self.repositories.insert(id, repo);
1178 cx.emit(GitStoreEvent::RepositoryAdded(id));
1179 self.active_repo_id.get_or_insert_with(|| {
1180 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1181 id
1182 });
1183 }
1184 }
1185
1186 for id in removed_ids {
1187 if self.active_repo_id == Some(id) {
1188 self.active_repo_id = None;
1189 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1190 }
1191 self.repositories.remove(&id);
1192 if let Some(updates_tx) = updates_tx.as_ref() {
1193 updates_tx
1194 .unbounded_send(DownstreamUpdate::RemoveRepository(id))
1195 .ok();
1196 }
1197 }
1198 }
1199
1200 fn on_buffer_store_event(
1201 &mut self,
1202 _: Entity<BufferStore>,
1203 event: &BufferStoreEvent,
1204 cx: &mut Context<Self>,
1205 ) {
1206 match event {
1207 BufferStoreEvent::BufferAdded(buffer) => {
1208 cx.subscribe(&buffer, |this, buffer, event, cx| {
1209 if let BufferEvent::LanguageChanged = event {
1210 let buffer_id = buffer.read(cx).remote_id();
1211 if let Some(diff_state) = this.diffs.get(&buffer_id) {
1212 diff_state.update(cx, |diff_state, cx| {
1213 diff_state.buffer_language_changed(buffer, cx);
1214 });
1215 }
1216 }
1217 })
1218 .detach();
1219 }
1220 BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => {
1221 if let Some(diffs) = self.shared_diffs.get_mut(peer_id) {
1222 diffs.remove(buffer_id);
1223 }
1224 }
1225 BufferStoreEvent::BufferDropped(buffer_id) => {
1226 self.diffs.remove(&buffer_id);
1227 for diffs in self.shared_diffs.values_mut() {
1228 diffs.remove(buffer_id);
1229 }
1230 }
1231
1232 _ => {}
1233 }
1234 }
1235
1236 pub fn recalculate_buffer_diffs(
1237 &mut self,
1238 buffers: Vec<Entity<Buffer>>,
1239 cx: &mut Context<Self>,
1240 ) -> impl Future<Output = ()> + use<> {
1241 let mut futures = Vec::new();
1242 for buffer in buffers {
1243 if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
1244 let buffer = buffer.read(cx).text_snapshot();
1245 diff_state.update(cx, |diff_state, cx| {
1246 diff_state.recalculate_diffs(buffer, cx);
1247 futures.extend(diff_state.wait_for_recalculation());
1248 });
1249 }
1250 }
1251 async move {
1252 futures::future::join_all(futures).await;
1253 }
1254 }
1255
1256 fn on_buffer_diff_event(
1257 &mut self,
1258 diff: Entity<buffer_diff::BufferDiff>,
1259 event: &BufferDiffEvent,
1260 cx: &mut Context<Self>,
1261 ) {
1262 if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
1263 let buffer_id = diff.read(cx).buffer_id;
1264 if let Some(diff_state) = self.diffs.get(&buffer_id) {
1265 let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
1266 diff_state.hunk_staging_operation_count += 1;
1267 diff_state.hunk_staging_operation_count
1268 });
1269 if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
1270 let recv = repo.update(cx, |repo, cx| {
1271 log::debug!("hunks changed for {}", path.display());
1272 repo.spawn_set_index_text_job(
1273 path,
1274 new_index_text.as_ref().map(|rope| rope.to_string()),
1275 Some(hunk_staging_operation_count),
1276 cx,
1277 )
1278 });
1279 let diff = diff.downgrade();
1280 cx.spawn(async move |this, cx| {
1281 if let Ok(Err(error)) = cx.background_spawn(recv).await {
1282 diff.update(cx, |diff, cx| {
1283 diff.clear_pending_hunks(cx);
1284 })
1285 .ok();
1286 this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
1287 .ok();
1288 }
1289 })
1290 .detach();
1291 }
1292 }
1293 }
1294 }
1295
1296 fn local_worktree_git_repos_changed(
1297 &mut self,
1298 worktree: Entity<Worktree>,
1299 changed_repos: &UpdatedGitRepositoriesSet,
1300 cx: &mut Context<Self>,
1301 ) {
1302 log::debug!("local worktree repos changed");
1303 debug_assert!(worktree.read(cx).is_local());
1304
1305 for repository in self.repositories.values() {
1306 repository.update(cx, |repository, cx| {
1307 let repo_abs_path = &repository.work_directory_abs_path;
1308 if changed_repos.iter().any(|update| {
1309 update.old_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1310 || update.new_work_directory_abs_path.as_ref() == Some(&repo_abs_path)
1311 }) {
1312 repository.reload_buffer_diff_bases(cx);
1313 }
1314 });
1315 }
1316 }
1317
1318 pub fn repositories(&self) -> &HashMap<RepositoryId, Entity<Repository>> {
1319 &self.repositories
1320 }
1321
1322 pub fn status_for_buffer_id(&self, buffer_id: BufferId, cx: &App) -> Option<FileStatus> {
1323 let (repo, path) = self.repository_and_path_for_buffer_id(buffer_id, cx)?;
1324 let status = repo.read(cx).snapshot.status_for_path(&path)?;
1325 Some(status.status)
1326 }
1327
1328 pub fn repository_and_path_for_buffer_id(
1329 &self,
1330 buffer_id: BufferId,
1331 cx: &App,
1332 ) -> Option<(Entity<Repository>, RepoPath)> {
1333 let buffer = self.buffer_store.read(cx).get(buffer_id)?;
1334 let project_path = buffer.read(cx).project_path(cx)?;
1335 self.repository_and_path_for_project_path(&project_path, cx)
1336 }
1337
1338 pub fn repository_and_path_for_project_path(
1339 &self,
1340 path: &ProjectPath,
1341 cx: &App,
1342 ) -> Option<(Entity<Repository>, RepoPath)> {
1343 let abs_path = self.worktree_store.read(cx).absolutize(path, cx)?;
1344 self.repositories
1345 .values()
1346 .filter_map(|repo| {
1347 let repo_path = repo.read(cx).abs_path_to_repo_path(&abs_path)?;
1348 Some((repo.clone(), repo_path))
1349 })
1350 .max_by_key(|(repo, _)| repo.read(cx).work_directory_abs_path.clone())
1351 }
1352
1353 pub fn git_init(
1354 &self,
1355 path: Arc<Path>,
1356 fallback_branch_name: String,
1357 cx: &App,
1358 ) -> Task<Result<()>> {
1359 match &self.state {
1360 GitStoreState::Local { fs, .. } => {
1361 let fs = fs.clone();
1362 cx.background_executor()
1363 .spawn(async move { fs.git_init(&path, fallback_branch_name) })
1364 }
1365 GitStoreState::Ssh {
1366 upstream_client,
1367 upstream_project_id: project_id,
1368 ..
1369 }
1370 | GitStoreState::Remote {
1371 upstream_client,
1372 upstream_project_id: project_id,
1373 ..
1374 } => {
1375 let client = upstream_client.clone();
1376 let project_id = *project_id;
1377 cx.background_executor().spawn(async move {
1378 client
1379 .request(proto::GitInit {
1380 project_id: project_id.0,
1381 abs_path: path.to_string_lossy().to_string(),
1382 fallback_branch_name,
1383 })
1384 .await?;
1385 Ok(())
1386 })
1387 }
1388 }
1389 }
1390
1391 async fn handle_update_repository(
1392 this: Entity<Self>,
1393 envelope: TypedEnvelope<proto::UpdateRepository>,
1394 mut cx: AsyncApp,
1395 ) -> Result<()> {
1396 this.update(&mut cx, |this, cx| {
1397 let mut update = envelope.payload;
1398
1399 let id = RepositoryId::from_proto(update.id);
1400 let client = this
1401 .upstream_client()
1402 .context("no upstream client")?
1403 .clone();
1404
1405 let mut is_new = false;
1406 let repo = this.repositories.entry(id).or_insert_with(|| {
1407 is_new = true;
1408 let git_store = cx.weak_entity();
1409 cx.new(|cx| {
1410 Repository::remote(
1411 id,
1412 Path::new(&update.abs_path).into(),
1413 ProjectId(update.project_id),
1414 client,
1415 git_store,
1416 cx,
1417 )
1418 })
1419 });
1420 if is_new {
1421 this._subscriptions
1422 .push(cx.subscribe(&repo, Self::on_repository_event))
1423 }
1424
1425 repo.update(cx, {
1426 let update = update.clone();
1427 |repo, cx| repo.apply_remote_update(update, cx)
1428 })?;
1429
1430 this.active_repo_id.get_or_insert_with(|| {
1431 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1432 id
1433 });
1434
1435 if let Some((client, project_id)) = this.downstream_client() {
1436 update.project_id = project_id.to_proto();
1437 client.send(update).log_err();
1438 }
1439 Ok(())
1440 })?
1441 }
1442
1443 async fn handle_remove_repository(
1444 this: Entity<Self>,
1445 envelope: TypedEnvelope<proto::RemoveRepository>,
1446 mut cx: AsyncApp,
1447 ) -> Result<()> {
1448 this.update(&mut cx, |this, cx| {
1449 let mut update = envelope.payload;
1450 let id = RepositoryId::from_proto(update.id);
1451 this.repositories.remove(&id);
1452 if let Some((client, project_id)) = this.downstream_client() {
1453 update.project_id = project_id.to_proto();
1454 client.send(update).log_err();
1455 }
1456 if this.active_repo_id == Some(id) {
1457 this.active_repo_id = None;
1458 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1459 }
1460 cx.emit(GitStoreEvent::RepositoryRemoved(id));
1461 })
1462 }
1463
1464 async fn handle_git_init(
1465 this: Entity<Self>,
1466 envelope: TypedEnvelope<proto::GitInit>,
1467 cx: AsyncApp,
1468 ) -> Result<proto::Ack> {
1469 let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1470 let name = envelope.payload.fallback_branch_name;
1471 cx.update(|cx| this.read(cx).git_init(path, name, cx))?
1472 .await?;
1473
1474 Ok(proto::Ack {})
1475 }
1476
1477 async fn handle_fetch(
1478 this: Entity<Self>,
1479 envelope: TypedEnvelope<proto::Fetch>,
1480 mut cx: AsyncApp,
1481 ) -> Result<proto::RemoteMessageResponse> {
1482 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1483 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1484 let askpass_id = envelope.payload.askpass_id;
1485
1486 let askpass = make_remote_delegate(
1487 this,
1488 envelope.payload.project_id,
1489 repository_id,
1490 askpass_id,
1491 &mut cx,
1492 );
1493
1494 let remote_output = repository_handle
1495 .update(&mut cx, |repository_handle, cx| {
1496 repository_handle.fetch(askpass, cx)
1497 })?
1498 .await??;
1499
1500 Ok(proto::RemoteMessageResponse {
1501 stdout: remote_output.stdout,
1502 stderr: remote_output.stderr,
1503 })
1504 }
1505
1506 async fn handle_push(
1507 this: Entity<Self>,
1508 envelope: TypedEnvelope<proto::Push>,
1509 mut cx: AsyncApp,
1510 ) -> Result<proto::RemoteMessageResponse> {
1511 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1512 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1513
1514 let askpass_id = envelope.payload.askpass_id;
1515 let askpass = make_remote_delegate(
1516 this,
1517 envelope.payload.project_id,
1518 repository_id,
1519 askpass_id,
1520 &mut cx,
1521 );
1522
1523 let options = envelope
1524 .payload
1525 .options
1526 .as_ref()
1527 .map(|_| match envelope.payload.options() {
1528 proto::push::PushOptions::SetUpstream => git::repository::PushOptions::SetUpstream,
1529 proto::push::PushOptions::Force => git::repository::PushOptions::Force,
1530 });
1531
1532 let branch_name = envelope.payload.branch_name.into();
1533 let remote_name = envelope.payload.remote_name.into();
1534
1535 let remote_output = repository_handle
1536 .update(&mut cx, |repository_handle, cx| {
1537 repository_handle.push(branch_name, remote_name, options, askpass, cx)
1538 })?
1539 .await??;
1540 Ok(proto::RemoteMessageResponse {
1541 stdout: remote_output.stdout,
1542 stderr: remote_output.stderr,
1543 })
1544 }
1545
1546 async fn handle_pull(
1547 this: Entity<Self>,
1548 envelope: TypedEnvelope<proto::Pull>,
1549 mut cx: AsyncApp,
1550 ) -> Result<proto::RemoteMessageResponse> {
1551 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1552 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1553 let askpass_id = envelope.payload.askpass_id;
1554 let askpass = make_remote_delegate(
1555 this,
1556 envelope.payload.project_id,
1557 repository_id,
1558 askpass_id,
1559 &mut cx,
1560 );
1561
1562 let branch_name = envelope.payload.branch_name.into();
1563 let remote_name = envelope.payload.remote_name.into();
1564
1565 let remote_message = repository_handle
1566 .update(&mut cx, |repository_handle, cx| {
1567 repository_handle.pull(branch_name, remote_name, askpass, cx)
1568 })?
1569 .await??;
1570
1571 Ok(proto::RemoteMessageResponse {
1572 stdout: remote_message.stdout,
1573 stderr: remote_message.stderr,
1574 })
1575 }
1576
1577 async fn handle_stage(
1578 this: Entity<Self>,
1579 envelope: TypedEnvelope<proto::Stage>,
1580 mut cx: AsyncApp,
1581 ) -> Result<proto::Ack> {
1582 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1583 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1584
1585 let entries = envelope
1586 .payload
1587 .paths
1588 .into_iter()
1589 .map(PathBuf::from)
1590 .map(RepoPath::new)
1591 .collect();
1592
1593 repository_handle
1594 .update(&mut cx, |repository_handle, cx| {
1595 repository_handle.stage_entries(entries, cx)
1596 })?
1597 .await?;
1598 Ok(proto::Ack {})
1599 }
1600
1601 async fn handle_unstage(
1602 this: Entity<Self>,
1603 envelope: TypedEnvelope<proto::Unstage>,
1604 mut cx: AsyncApp,
1605 ) -> Result<proto::Ack> {
1606 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1607 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1608
1609 let entries = envelope
1610 .payload
1611 .paths
1612 .into_iter()
1613 .map(PathBuf::from)
1614 .map(RepoPath::new)
1615 .collect();
1616
1617 repository_handle
1618 .update(&mut cx, |repository_handle, cx| {
1619 repository_handle.unstage_entries(entries, cx)
1620 })?
1621 .await?;
1622
1623 Ok(proto::Ack {})
1624 }
1625
1626 async fn handle_set_index_text(
1627 this: Entity<Self>,
1628 envelope: TypedEnvelope<proto::SetIndexText>,
1629 mut cx: AsyncApp,
1630 ) -> Result<proto::Ack> {
1631 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1632 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1633 let repo_path = RepoPath::from_str(&envelope.payload.path);
1634
1635 repository_handle
1636 .update(&mut cx, |repository_handle, cx| {
1637 repository_handle.spawn_set_index_text_job(
1638 repo_path,
1639 envelope.payload.text,
1640 None,
1641 cx,
1642 )
1643 })?
1644 .await??;
1645 Ok(proto::Ack {})
1646 }
1647
1648 async fn handle_commit(
1649 this: Entity<Self>,
1650 envelope: TypedEnvelope<proto::Commit>,
1651 mut cx: AsyncApp,
1652 ) -> Result<proto::Ack> {
1653 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1654 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1655
1656 let message = SharedString::from(envelope.payload.message);
1657 let name = envelope.payload.name.map(SharedString::from);
1658 let email = envelope.payload.email.map(SharedString::from);
1659
1660 repository_handle
1661 .update(&mut cx, |repository_handle, cx| {
1662 repository_handle.commit(message, name.zip(email), cx)
1663 })?
1664 .await??;
1665 Ok(proto::Ack {})
1666 }
1667
1668 async fn handle_get_remotes(
1669 this: Entity<Self>,
1670 envelope: TypedEnvelope<proto::GetRemotes>,
1671 mut cx: AsyncApp,
1672 ) -> Result<proto::GetRemotesResponse> {
1673 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1674 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1675
1676 let branch_name = envelope.payload.branch_name;
1677
1678 let remotes = repository_handle
1679 .update(&mut cx, |repository_handle, _| {
1680 repository_handle.get_remotes(branch_name)
1681 })?
1682 .await??;
1683
1684 Ok(proto::GetRemotesResponse {
1685 remotes: remotes
1686 .into_iter()
1687 .map(|remotes| proto::get_remotes_response::Remote {
1688 name: remotes.name.to_string(),
1689 })
1690 .collect::<Vec<_>>(),
1691 })
1692 }
1693
1694 async fn handle_get_branches(
1695 this: Entity<Self>,
1696 envelope: TypedEnvelope<proto::GitGetBranches>,
1697 mut cx: AsyncApp,
1698 ) -> Result<proto::GitBranchesResponse> {
1699 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1700 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1701
1702 let branches = repository_handle
1703 .update(&mut cx, |repository_handle, _| repository_handle.branches())?
1704 .await??;
1705
1706 Ok(proto::GitBranchesResponse {
1707 branches: branches
1708 .into_iter()
1709 .map(|branch| branch_to_proto(&branch))
1710 .collect::<Vec<_>>(),
1711 })
1712 }
1713 async fn handle_create_branch(
1714 this: Entity<Self>,
1715 envelope: TypedEnvelope<proto::GitCreateBranch>,
1716 mut cx: AsyncApp,
1717 ) -> Result<proto::Ack> {
1718 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1719 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1720 let branch_name = envelope.payload.branch_name;
1721
1722 repository_handle
1723 .update(&mut cx, |repository_handle, _| {
1724 repository_handle.create_branch(branch_name)
1725 })?
1726 .await??;
1727
1728 Ok(proto::Ack {})
1729 }
1730
1731 async fn handle_change_branch(
1732 this: Entity<Self>,
1733 envelope: TypedEnvelope<proto::GitChangeBranch>,
1734 mut cx: AsyncApp,
1735 ) -> Result<proto::Ack> {
1736 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1737 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1738 let branch_name = envelope.payload.branch_name;
1739
1740 repository_handle
1741 .update(&mut cx, |repository_handle, _| {
1742 repository_handle.change_branch(branch_name)
1743 })?
1744 .await??;
1745
1746 Ok(proto::Ack {})
1747 }
1748
1749 async fn handle_show(
1750 this: Entity<Self>,
1751 envelope: TypedEnvelope<proto::GitShow>,
1752 mut cx: AsyncApp,
1753 ) -> Result<proto::GitCommitDetails> {
1754 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1755 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1756
1757 let commit = repository_handle
1758 .update(&mut cx, |repository_handle, _| {
1759 repository_handle.show(envelope.payload.commit)
1760 })?
1761 .await??;
1762 Ok(proto::GitCommitDetails {
1763 sha: commit.sha.into(),
1764 message: commit.message.into(),
1765 commit_timestamp: commit.commit_timestamp,
1766 author_email: commit.author_email.into(),
1767 author_name: commit.author_name.into(),
1768 })
1769 }
1770
1771 async fn handle_load_commit_diff(
1772 this: Entity<Self>,
1773 envelope: TypedEnvelope<proto::LoadCommitDiff>,
1774 mut cx: AsyncApp,
1775 ) -> Result<proto::LoadCommitDiffResponse> {
1776 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1777 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1778
1779 let commit_diff = repository_handle
1780 .update(&mut cx, |repository_handle, _| {
1781 repository_handle.load_commit_diff(envelope.payload.commit)
1782 })?
1783 .await??;
1784 Ok(proto::LoadCommitDiffResponse {
1785 files: commit_diff
1786 .files
1787 .into_iter()
1788 .map(|file| proto::CommitFile {
1789 path: file.path.to_string(),
1790 old_text: file.old_text,
1791 new_text: file.new_text,
1792 })
1793 .collect(),
1794 })
1795 }
1796
1797 async fn handle_reset(
1798 this: Entity<Self>,
1799 envelope: TypedEnvelope<proto::GitReset>,
1800 mut cx: AsyncApp,
1801 ) -> Result<proto::Ack> {
1802 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1803 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1804
1805 let mode = match envelope.payload.mode() {
1806 git_reset::ResetMode::Soft => ResetMode::Soft,
1807 git_reset::ResetMode::Mixed => ResetMode::Mixed,
1808 };
1809
1810 repository_handle
1811 .update(&mut cx, |repository_handle, cx| {
1812 repository_handle.reset(envelope.payload.commit, mode, cx)
1813 })?
1814 .await??;
1815 Ok(proto::Ack {})
1816 }
1817
1818 async fn handle_checkout_files(
1819 this: Entity<Self>,
1820 envelope: TypedEnvelope<proto::GitCheckoutFiles>,
1821 mut cx: AsyncApp,
1822 ) -> Result<proto::Ack> {
1823 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1824 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1825 let paths = envelope
1826 .payload
1827 .paths
1828 .iter()
1829 .map(|s| RepoPath::from_str(s))
1830 .collect();
1831
1832 repository_handle
1833 .update(&mut cx, |repository_handle, cx| {
1834 repository_handle.checkout_files(&envelope.payload.commit, paths, cx)
1835 })?
1836 .await??;
1837 Ok(proto::Ack {})
1838 }
1839
1840 async fn handle_open_commit_message_buffer(
1841 this: Entity<Self>,
1842 envelope: TypedEnvelope<proto::OpenCommitMessageBuffer>,
1843 mut cx: AsyncApp,
1844 ) -> Result<proto::OpenBufferResponse> {
1845 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1846 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1847 let buffer = repository
1848 .update(&mut cx, |repository, cx| {
1849 repository.open_commit_buffer(None, this.read(cx).buffer_store.clone(), cx)
1850 })?
1851 .await?;
1852
1853 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id())?;
1854 this.update(&mut cx, |this, cx| {
1855 this.buffer_store.update(cx, |buffer_store, cx| {
1856 buffer_store
1857 .create_buffer_for_peer(
1858 &buffer,
1859 envelope.original_sender_id.unwrap_or(envelope.sender_id),
1860 cx,
1861 )
1862 .detach_and_log_err(cx);
1863 })
1864 })?;
1865
1866 Ok(proto::OpenBufferResponse {
1867 buffer_id: buffer_id.to_proto(),
1868 })
1869 }
1870
1871 async fn handle_askpass(
1872 this: Entity<Self>,
1873 envelope: TypedEnvelope<proto::AskPassRequest>,
1874 mut cx: AsyncApp,
1875 ) -> Result<proto::AskPassResponse> {
1876 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1877 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
1878
1879 let delegates = cx.update(|cx| repository.read(cx).askpass_delegates.clone())?;
1880 let Some(mut askpass) = delegates.lock().remove(&envelope.payload.askpass_id) else {
1881 debug_panic!("no askpass found");
1882 return Err(anyhow::anyhow!("no askpass found"));
1883 };
1884
1885 let response = askpass.ask_password(envelope.payload.prompt).await?;
1886
1887 delegates
1888 .lock()
1889 .insert(envelope.payload.askpass_id, askpass);
1890
1891 Ok(proto::AskPassResponse { response })
1892 }
1893
1894 async fn handle_check_for_pushed_commits(
1895 this: Entity<Self>,
1896 envelope: TypedEnvelope<proto::CheckForPushedCommits>,
1897 mut cx: AsyncApp,
1898 ) -> Result<proto::CheckForPushedCommitsResponse> {
1899 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1900 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1901
1902 let branches = repository_handle
1903 .update(&mut cx, |repository_handle, _| {
1904 repository_handle.check_for_pushed_commits()
1905 })?
1906 .await??;
1907 Ok(proto::CheckForPushedCommitsResponse {
1908 pushed_to: branches
1909 .into_iter()
1910 .map(|commit| commit.to_string())
1911 .collect(),
1912 })
1913 }
1914
1915 async fn handle_git_diff(
1916 this: Entity<Self>,
1917 envelope: TypedEnvelope<proto::GitDiff>,
1918 mut cx: AsyncApp,
1919 ) -> Result<proto::GitDiffResponse> {
1920 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1921 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1922 let diff_type = match envelope.payload.diff_type() {
1923 proto::git_diff::DiffType::HeadToIndex => DiffType::HeadToIndex,
1924 proto::git_diff::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
1925 };
1926
1927 let mut diff = repository_handle
1928 .update(&mut cx, |repository_handle, cx| {
1929 repository_handle.diff(diff_type, cx)
1930 })?
1931 .await??;
1932 const ONE_MB: usize = 1_000_000;
1933 if diff.len() > ONE_MB {
1934 diff = diff.chars().take(ONE_MB).collect()
1935 }
1936
1937 Ok(proto::GitDiffResponse { diff })
1938 }
1939
1940 async fn handle_open_unstaged_diff(
1941 this: Entity<Self>,
1942 request: TypedEnvelope<proto::OpenUnstagedDiff>,
1943 mut cx: AsyncApp,
1944 ) -> Result<proto::OpenUnstagedDiffResponse> {
1945 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1946 let diff = this
1947 .update(&mut cx, |this, cx| {
1948 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
1949 Some(this.open_unstaged_diff(buffer, cx))
1950 })?
1951 .ok_or_else(|| anyhow!("no such buffer"))?
1952 .await?;
1953 this.update(&mut cx, |this, _| {
1954 let shared_diffs = this
1955 .shared_diffs
1956 .entry(request.original_sender_id.unwrap_or(request.sender_id))
1957 .or_default();
1958 shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone());
1959 })?;
1960 let staged_text = diff.read_with(&cx, |diff, _| diff.base_text_string())?;
1961 Ok(proto::OpenUnstagedDiffResponse { staged_text })
1962 }
1963
1964 async fn handle_open_uncommitted_diff(
1965 this: Entity<Self>,
1966 request: TypedEnvelope<proto::OpenUncommittedDiff>,
1967 mut cx: AsyncApp,
1968 ) -> Result<proto::OpenUncommittedDiffResponse> {
1969 let buffer_id = BufferId::new(request.payload.buffer_id)?;
1970 let diff = this
1971 .update(&mut cx, |this, cx| {
1972 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
1973 Some(this.open_uncommitted_diff(buffer, cx))
1974 })?
1975 .ok_or_else(|| anyhow!("no such buffer"))?
1976 .await?;
1977 this.update(&mut cx, |this, _| {
1978 let shared_diffs = this
1979 .shared_diffs
1980 .entry(request.original_sender_id.unwrap_or(request.sender_id))
1981 .or_default();
1982 shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone());
1983 })?;
1984 diff.read_with(&cx, |diff, cx| {
1985 use proto::open_uncommitted_diff_response::Mode;
1986
1987 let unstaged_diff = diff.secondary_diff();
1988 let index_snapshot = unstaged_diff.and_then(|diff| {
1989 let diff = diff.read(cx);
1990 diff.base_text_exists().then(|| diff.base_text())
1991 });
1992
1993 let mode;
1994 let staged_text;
1995 let committed_text;
1996 if diff.base_text_exists() {
1997 let committed_snapshot = diff.base_text();
1998 committed_text = Some(committed_snapshot.text());
1999 if let Some(index_text) = index_snapshot {
2000 if index_text.remote_id() == committed_snapshot.remote_id() {
2001 mode = Mode::IndexMatchesHead;
2002 staged_text = None;
2003 } else {
2004 mode = Mode::IndexAndHead;
2005 staged_text = Some(index_text.text());
2006 }
2007 } else {
2008 mode = Mode::IndexAndHead;
2009 staged_text = None;
2010 }
2011 } else {
2012 mode = Mode::IndexAndHead;
2013 committed_text = None;
2014 staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2015 }
2016
2017 proto::OpenUncommittedDiffResponse {
2018 committed_text,
2019 staged_text,
2020 mode: mode.into(),
2021 }
2022 })
2023 }
2024
2025 async fn handle_update_diff_bases(
2026 this: Entity<Self>,
2027 request: TypedEnvelope<proto::UpdateDiffBases>,
2028 mut cx: AsyncApp,
2029 ) -> Result<()> {
2030 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2031 this.update(&mut cx, |this, cx| {
2032 if let Some(diff_state) = this.diffs.get_mut(&buffer_id) {
2033 if let Some(buffer) = this.buffer_store.read(cx).get(buffer_id) {
2034 let buffer = buffer.read(cx).text_snapshot();
2035 diff_state.update(cx, |diff_state, cx| {
2036 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2037 })
2038 }
2039 }
2040 })
2041 }
2042
2043 async fn handle_blame_buffer(
2044 this: Entity<Self>,
2045 envelope: TypedEnvelope<proto::BlameBuffer>,
2046 mut cx: AsyncApp,
2047 ) -> Result<proto::BlameBufferResponse> {
2048 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2049 let version = deserialize_version(&envelope.payload.version);
2050 let buffer = this.read_with(&cx, |this, cx| {
2051 this.buffer_store.read(cx).get_existing(buffer_id)
2052 })??;
2053 buffer
2054 .update(&mut cx, |buffer, _| {
2055 buffer.wait_for_version(version.clone())
2056 })?
2057 .await?;
2058 let blame = this
2059 .update(&mut cx, |this, cx| {
2060 this.blame_buffer(&buffer, Some(version), cx)
2061 })?
2062 .await?;
2063 Ok(serialize_blame_buffer_response(blame))
2064 }
2065
2066 async fn handle_get_permalink_to_line(
2067 this: Entity<Self>,
2068 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2069 mut cx: AsyncApp,
2070 ) -> Result<proto::GetPermalinkToLineResponse> {
2071 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2072 // let version = deserialize_version(&envelope.payload.version);
2073 let selection = {
2074 let proto_selection = envelope
2075 .payload
2076 .selection
2077 .context("no selection to get permalink for defined")?;
2078 proto_selection.start as u32..proto_selection.end as u32
2079 };
2080 let buffer = this.read_with(&cx, |this, cx| {
2081 this.buffer_store.read(cx).get_existing(buffer_id)
2082 })??;
2083 let permalink = this
2084 .update(&mut cx, |this, cx| {
2085 this.get_permalink_to_line(&buffer, selection, cx)
2086 })?
2087 .await?;
2088 Ok(proto::GetPermalinkToLineResponse {
2089 permalink: permalink.to_string(),
2090 })
2091 }
2092
2093 fn repository_for_request(
2094 this: &Entity<Self>,
2095 id: RepositoryId,
2096 cx: &mut AsyncApp,
2097 ) -> Result<Entity<Repository>> {
2098 this.update(cx, |this, _| {
2099 this.repositories
2100 .get(&id)
2101 .context("missing repository handle")
2102 .cloned()
2103 })?
2104 }
2105
2106 pub fn repo_snapshots(&self, cx: &App) -> HashMap<RepositoryId, RepositorySnapshot> {
2107 self.repositories
2108 .iter()
2109 .map(|(id, repo)| (*id, repo.read(cx).snapshot.clone()))
2110 .collect()
2111 }
2112}
2113
2114impl BufferDiffState {
2115 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
2116 self.language = buffer.read(cx).language().cloned();
2117 self.language_changed = true;
2118 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
2119 }
2120
2121 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
2122 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
2123 }
2124
2125 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
2126 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
2127 }
2128
2129 fn handle_base_texts_updated(
2130 &mut self,
2131 buffer: text::BufferSnapshot,
2132 message: proto::UpdateDiffBases,
2133 cx: &mut Context<Self>,
2134 ) {
2135 use proto::update_diff_bases::Mode;
2136
2137 let Some(mode) = Mode::from_i32(message.mode) else {
2138 return;
2139 };
2140
2141 let diff_bases_change = match mode {
2142 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
2143 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
2144 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
2145 Mode::IndexAndHead => DiffBasesChange::SetEach {
2146 index: message.staged_text,
2147 head: message.committed_text,
2148 },
2149 };
2150
2151 self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
2152 }
2153
2154 pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
2155 if *self.recalculating_tx.borrow() {
2156 let mut rx = self.recalculating_tx.subscribe();
2157 return Some(async move {
2158 loop {
2159 let is_recalculating = rx.recv().await;
2160 if is_recalculating != Some(true) {
2161 break;
2162 }
2163 }
2164 });
2165 } else {
2166 None
2167 }
2168 }
2169
2170 fn diff_bases_changed(
2171 &mut self,
2172 buffer: text::BufferSnapshot,
2173 diff_bases_change: Option<DiffBasesChange>,
2174 cx: &mut Context<Self>,
2175 ) {
2176 match diff_bases_change {
2177 Some(DiffBasesChange::SetIndex(index)) => {
2178 self.index_text = index.map(|mut index| {
2179 text::LineEnding::normalize(&mut index);
2180 Arc::new(index)
2181 });
2182 self.index_changed = true;
2183 }
2184 Some(DiffBasesChange::SetHead(head)) => {
2185 self.head_text = head.map(|mut head| {
2186 text::LineEnding::normalize(&mut head);
2187 Arc::new(head)
2188 });
2189 self.head_changed = true;
2190 }
2191 Some(DiffBasesChange::SetBoth(text)) => {
2192 let text = text.map(|mut text| {
2193 text::LineEnding::normalize(&mut text);
2194 Arc::new(text)
2195 });
2196 self.head_text = text.clone();
2197 self.index_text = text;
2198 self.head_changed = true;
2199 self.index_changed = true;
2200 }
2201 Some(DiffBasesChange::SetEach { index, head }) => {
2202 self.index_text = index.map(|mut index| {
2203 text::LineEnding::normalize(&mut index);
2204 Arc::new(index)
2205 });
2206 self.index_changed = true;
2207 self.head_text = head.map(|mut head| {
2208 text::LineEnding::normalize(&mut head);
2209 Arc::new(head)
2210 });
2211 self.head_changed = true;
2212 }
2213 None => {}
2214 }
2215
2216 self.recalculate_diffs(buffer, cx)
2217 }
2218
2219 fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
2220 *self.recalculating_tx.borrow_mut() = true;
2221
2222 let language = self.language.clone();
2223 let language_registry = self.language_registry.clone();
2224 let unstaged_diff = self.unstaged_diff();
2225 let uncommitted_diff = self.uncommitted_diff();
2226 let head = self.head_text.clone();
2227 let index = self.index_text.clone();
2228 let index_changed = self.index_changed;
2229 let head_changed = self.head_changed;
2230 let language_changed = self.language_changed;
2231 let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_write;
2232 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
2233 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
2234 (None, None) => true,
2235 _ => false,
2236 };
2237 self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
2238 log::debug!(
2239 "start recalculating diffs for buffer {}",
2240 buffer.remote_id()
2241 );
2242
2243 let mut new_unstaged_diff = None;
2244 if let Some(unstaged_diff) = &unstaged_diff {
2245 new_unstaged_diff = Some(
2246 BufferDiff::update_diff(
2247 unstaged_diff.clone(),
2248 buffer.clone(),
2249 index,
2250 index_changed,
2251 language_changed,
2252 language.clone(),
2253 language_registry.clone(),
2254 cx,
2255 )
2256 .await?,
2257 );
2258 }
2259
2260 let mut new_uncommitted_diff = None;
2261 if let Some(uncommitted_diff) = &uncommitted_diff {
2262 new_uncommitted_diff = if index_matches_head {
2263 new_unstaged_diff.clone()
2264 } else {
2265 Some(
2266 BufferDiff::update_diff(
2267 uncommitted_diff.clone(),
2268 buffer.clone(),
2269 head,
2270 head_changed,
2271 language_changed,
2272 language.clone(),
2273 language_registry.clone(),
2274 cx,
2275 )
2276 .await?,
2277 )
2278 }
2279 }
2280
2281 let cancel = this.update(cx, |this, _| {
2282 // This checks whether all pending stage/unstage operations
2283 // have quiesced (i.e. both the corresponding write and the
2284 // read of that write have completed). If not, then we cancel
2285 // this recalculation attempt to avoid invalidating pending
2286 // state too quickly; another recalculation will come along
2287 // later and clear the pending state once the state of the index has settled.
2288 if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
2289 *this.recalculating_tx.borrow_mut() = false;
2290 true
2291 } else {
2292 false
2293 }
2294 })?;
2295 if cancel {
2296 log::debug!(
2297 concat!(
2298 "aborting recalculating diffs for buffer {}",
2299 "due to subsequent hunk operations",
2300 ),
2301 buffer.remote_id()
2302 );
2303 return Ok(());
2304 }
2305
2306 let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
2307 unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
2308 {
2309 unstaged_diff.update(cx, |diff, cx| {
2310 if language_changed {
2311 diff.language_changed(cx);
2312 }
2313 diff.set_snapshot(new_unstaged_diff, &buffer, cx)
2314 })?
2315 } else {
2316 None
2317 };
2318
2319 if let Some((uncommitted_diff, new_uncommitted_diff)) =
2320 uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
2321 {
2322 uncommitted_diff.update(cx, |diff, cx| {
2323 if language_changed {
2324 diff.language_changed(cx);
2325 }
2326 diff.set_snapshot_with_secondary(
2327 new_uncommitted_diff,
2328 &buffer,
2329 unstaged_changed_range,
2330 true,
2331 cx,
2332 );
2333 })?;
2334 }
2335
2336 log::debug!(
2337 "finished recalculating diffs for buffer {}",
2338 buffer.remote_id()
2339 );
2340
2341 if let Some(this) = this.upgrade() {
2342 this.update(cx, |this, _| {
2343 this.index_changed = false;
2344 this.head_changed = false;
2345 this.language_changed = false;
2346 *this.recalculating_tx.borrow_mut() = false;
2347 })?;
2348 }
2349
2350 Ok(())
2351 }));
2352 }
2353}
2354
2355impl Default for BufferDiffState {
2356 fn default() -> Self {
2357 Self {
2358 unstaged_diff: Default::default(),
2359 uncommitted_diff: Default::default(),
2360 recalculate_diff_task: Default::default(),
2361 language: Default::default(),
2362 language_registry: Default::default(),
2363 recalculating_tx: postage::watch::channel_with(false).0,
2364 hunk_staging_operation_count: 0,
2365 hunk_staging_operation_count_as_of_write: 0,
2366 head_text: Default::default(),
2367 index_text: Default::default(),
2368 head_changed: Default::default(),
2369 index_changed: Default::default(),
2370 language_changed: Default::default(),
2371 }
2372 }
2373}
2374
2375fn make_remote_delegate(
2376 this: Entity<GitStore>,
2377 project_id: u64,
2378 repository_id: RepositoryId,
2379 askpass_id: u64,
2380 cx: &mut AsyncApp,
2381) -> AskPassDelegate {
2382 AskPassDelegate::new(cx, move |prompt, tx, cx| {
2383 this.update(cx, |this, cx| {
2384 let Some((client, _)) = this.downstream_client() else {
2385 return;
2386 };
2387 let response = client.request(proto::AskPassRequest {
2388 project_id,
2389 repository_id: repository_id.to_proto(),
2390 askpass_id,
2391 prompt,
2392 });
2393 cx.spawn(async move |_, _| {
2394 tx.send(response.await?.response).ok();
2395 anyhow::Ok(())
2396 })
2397 .detach_and_log_err(cx);
2398 })
2399 .log_err();
2400 })
2401}
2402
2403impl RepositoryId {
2404 pub fn to_proto(self) -> u64 {
2405 self.0
2406 }
2407
2408 pub fn from_proto(id: u64) -> Self {
2409 RepositoryId(id)
2410 }
2411}
2412
2413impl RepositorySnapshot {
2414 fn empty(id: RepositoryId, work_directory_abs_path: Arc<Path>) -> Self {
2415 Self {
2416 id,
2417 merge_message: None,
2418 statuses_by_path: Default::default(),
2419 work_directory_abs_path,
2420 branch: None,
2421 merge_conflicts: Default::default(),
2422 merge_head_shas: Default::default(),
2423 scan_id: 0,
2424 }
2425 }
2426
2427 fn initial_update(&self, project_id: u64) -> proto::UpdateRepository {
2428 proto::UpdateRepository {
2429 branch_summary: self.branch.as_ref().map(branch_to_proto),
2430 updated_statuses: self
2431 .statuses_by_path
2432 .iter()
2433 .map(|entry| entry.to_proto())
2434 .collect(),
2435 removed_statuses: Default::default(),
2436 current_merge_conflicts: self
2437 .merge_conflicts
2438 .iter()
2439 .map(|repo_path| repo_path.to_proto())
2440 .collect(),
2441 project_id,
2442 id: self.id.to_proto(),
2443 abs_path: self.work_directory_abs_path.to_proto(),
2444 entry_ids: vec![self.id.to_proto()],
2445 scan_id: self.scan_id,
2446 is_last_update: true,
2447 }
2448 }
2449
2450 fn build_update(&self, old: &Self, project_id: u64) -> proto::UpdateRepository {
2451 let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
2452 let mut removed_statuses: Vec<String> = Vec::new();
2453
2454 let mut new_statuses = self.statuses_by_path.iter().peekable();
2455 let mut old_statuses = old.statuses_by_path.iter().peekable();
2456
2457 let mut current_new_entry = new_statuses.next();
2458 let mut current_old_entry = old_statuses.next();
2459 loop {
2460 match (current_new_entry, current_old_entry) {
2461 (Some(new_entry), Some(old_entry)) => {
2462 match new_entry.repo_path.cmp(&old_entry.repo_path) {
2463 Ordering::Less => {
2464 updated_statuses.push(new_entry.to_proto());
2465 current_new_entry = new_statuses.next();
2466 }
2467 Ordering::Equal => {
2468 if new_entry.status != old_entry.status {
2469 updated_statuses.push(new_entry.to_proto());
2470 }
2471 current_old_entry = old_statuses.next();
2472 current_new_entry = new_statuses.next();
2473 }
2474 Ordering::Greater => {
2475 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2476 current_old_entry = old_statuses.next();
2477 }
2478 }
2479 }
2480 (None, Some(old_entry)) => {
2481 removed_statuses.push(old_entry.repo_path.as_ref().to_proto());
2482 current_old_entry = old_statuses.next();
2483 }
2484 (Some(new_entry), None) => {
2485 updated_statuses.push(new_entry.to_proto());
2486 current_new_entry = new_statuses.next();
2487 }
2488 (None, None) => break,
2489 }
2490 }
2491
2492 proto::UpdateRepository {
2493 branch_summary: self.branch.as_ref().map(branch_to_proto),
2494 updated_statuses,
2495 removed_statuses,
2496 current_merge_conflicts: self
2497 .merge_conflicts
2498 .iter()
2499 .map(|path| path.as_ref().to_proto())
2500 .collect(),
2501 project_id,
2502 id: self.id.to_proto(),
2503 abs_path: self.work_directory_abs_path.to_proto(),
2504 entry_ids: vec![],
2505 scan_id: self.scan_id,
2506 is_last_update: true,
2507 }
2508 }
2509
2510 pub fn status(&self) -> impl Iterator<Item = StatusEntry> + '_ {
2511 self.statuses_by_path.iter().cloned()
2512 }
2513
2514 pub fn status_summary(&self) -> GitSummary {
2515 self.statuses_by_path.summary().item_summary
2516 }
2517
2518 pub fn status_for_path(&self, path: &RepoPath) -> Option<StatusEntry> {
2519 self.statuses_by_path
2520 .get(&PathKey(path.0.clone()), &())
2521 .cloned()
2522 }
2523
2524 pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
2525 abs_path
2526 .strip_prefix(&self.work_directory_abs_path)
2527 .map(RepoPath::from)
2528 .ok()
2529 }
2530
2531 pub fn has_conflict(&self, repo_path: &RepoPath) -> bool {
2532 self.merge_conflicts.contains(repo_path)
2533 }
2534
2535 /// This is the name that will be displayed in the repository selector for this repository.
2536 pub fn display_name(&self) -> SharedString {
2537 self.work_directory_abs_path
2538 .file_name()
2539 .unwrap_or_default()
2540 .to_string_lossy()
2541 .to_string()
2542 .into()
2543 }
2544}
2545
2546impl Repository {
2547 fn local(
2548 id: RepositoryId,
2549 work_directory_abs_path: Arc<Path>,
2550 dot_git_abs_path: Arc<Path>,
2551 repository_dir_abs_path: Arc<Path>,
2552 common_dir_abs_path: Arc<Path>,
2553 project_environment: WeakEntity<ProjectEnvironment>,
2554 fs: Arc<dyn Fs>,
2555 git_store: WeakEntity<GitStore>,
2556 cx: &mut Context<Self>,
2557 ) -> Self {
2558 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path.clone());
2559 Repository {
2560 this: cx.weak_entity(),
2561 git_store,
2562 snapshot,
2563 commit_message_buffer: None,
2564 askpass_delegates: Default::default(),
2565 paths_needing_status_update: Default::default(),
2566 latest_askpass_id: 0,
2567 job_sender: Repository::spawn_local_git_worker(
2568 work_directory_abs_path,
2569 dot_git_abs_path,
2570 repository_dir_abs_path,
2571 common_dir_abs_path,
2572 project_environment,
2573 fs,
2574 cx,
2575 ),
2576 job_id: 0,
2577 active_jobs: Default::default(),
2578 }
2579 }
2580
2581 fn remote(
2582 id: RepositoryId,
2583 work_directory_abs_path: Arc<Path>,
2584 project_id: ProjectId,
2585 client: AnyProtoClient,
2586 git_store: WeakEntity<GitStore>,
2587 cx: &mut Context<Self>,
2588 ) -> Self {
2589 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path);
2590 Self {
2591 this: cx.weak_entity(),
2592 snapshot,
2593 commit_message_buffer: None,
2594 git_store,
2595 paths_needing_status_update: Default::default(),
2596 job_sender: Self::spawn_remote_git_worker(project_id, client, cx),
2597 askpass_delegates: Default::default(),
2598 latest_askpass_id: 0,
2599 active_jobs: Default::default(),
2600 job_id: 0,
2601 }
2602 }
2603
2604 pub fn git_store(&self) -> Option<Entity<GitStore>> {
2605 self.git_store.upgrade()
2606 }
2607
2608 fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
2609 let this = cx.weak_entity();
2610 let git_store = self.git_store.clone();
2611 let _ = self.send_keyed_job(
2612 Some(GitJobKey::ReloadBufferDiffBases),
2613 None,
2614 |state, mut cx| async move {
2615 let RepositoryState::Local { backend, .. } = state else {
2616 log::error!("tried to recompute diffs for a non-local repository");
2617 return Ok(());
2618 };
2619
2620 let Some(this) = this.upgrade() else {
2621 return Ok(());
2622 };
2623
2624 let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
2625 git_store.update(cx, |git_store, cx| {
2626 git_store
2627 .diffs
2628 .iter()
2629 .filter_map(|(buffer_id, diff_state)| {
2630 let buffer_store = git_store.buffer_store.read(cx);
2631 let buffer = buffer_store.get(*buffer_id)?;
2632 let file = File::from_dyn(buffer.read(cx).file())?;
2633 let abs_path =
2634 file.worktree.read(cx).absolutize(&file.path).ok()?;
2635 let repo_path = this.abs_path_to_repo_path(&abs_path)?;
2636 log::debug!(
2637 "start reload diff bases for repo path {}",
2638 repo_path.0.display()
2639 );
2640 diff_state.update(cx, |diff_state, _| {
2641 let has_unstaged_diff = diff_state
2642 .unstaged_diff
2643 .as_ref()
2644 .is_some_and(|diff| diff.is_upgradable());
2645 let has_uncommitted_diff = diff_state
2646 .uncommitted_diff
2647 .as_ref()
2648 .is_some_and(|set| set.is_upgradable());
2649
2650 Some((
2651 buffer,
2652 repo_path,
2653 has_unstaged_diff.then(|| diff_state.index_text.clone()),
2654 has_uncommitted_diff.then(|| diff_state.head_text.clone()),
2655 ))
2656 })
2657 })
2658 .collect::<Vec<_>>()
2659 })
2660 })??;
2661
2662 let buffer_diff_base_changes = cx
2663 .background_spawn(async move {
2664 let mut changes = Vec::new();
2665 for (buffer, repo_path, current_index_text, current_head_text) in
2666 &repo_diff_state_updates
2667 {
2668 let index_text = if current_index_text.is_some() {
2669 backend.load_index_text(repo_path.clone()).await
2670 } else {
2671 None
2672 };
2673 let head_text = if current_head_text.is_some() {
2674 backend.load_committed_text(repo_path.clone()).await
2675 } else {
2676 None
2677 };
2678
2679 let change =
2680 match (current_index_text.as_ref(), current_head_text.as_ref()) {
2681 (Some(current_index), Some(current_head)) => {
2682 let index_changed =
2683 index_text.as_ref() != current_index.as_deref();
2684 let head_changed =
2685 head_text.as_ref() != current_head.as_deref();
2686 if index_changed && head_changed {
2687 if index_text == head_text {
2688 Some(DiffBasesChange::SetBoth(head_text))
2689 } else {
2690 Some(DiffBasesChange::SetEach {
2691 index: index_text,
2692 head: head_text,
2693 })
2694 }
2695 } else if index_changed {
2696 Some(DiffBasesChange::SetIndex(index_text))
2697 } else if head_changed {
2698 Some(DiffBasesChange::SetHead(head_text))
2699 } else {
2700 None
2701 }
2702 }
2703 (Some(current_index), None) => {
2704 let index_changed =
2705 index_text.as_ref() != current_index.as_deref();
2706 index_changed
2707 .then_some(DiffBasesChange::SetIndex(index_text))
2708 }
2709 (None, Some(current_head)) => {
2710 let head_changed =
2711 head_text.as_ref() != current_head.as_deref();
2712 head_changed.then_some(DiffBasesChange::SetHead(head_text))
2713 }
2714 (None, None) => None,
2715 };
2716
2717 changes.push((buffer.clone(), change))
2718 }
2719 changes
2720 })
2721 .await;
2722
2723 git_store.update(&mut cx, |git_store, cx| {
2724 for (buffer, diff_bases_change) in buffer_diff_base_changes {
2725 let buffer_snapshot = buffer.read(cx).text_snapshot();
2726 let buffer_id = buffer_snapshot.remote_id();
2727 let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
2728 continue;
2729 };
2730
2731 let downstream_client = git_store.downstream_client();
2732 diff_state.update(cx, |diff_state, cx| {
2733 use proto::update_diff_bases::Mode;
2734
2735 if let Some((diff_bases_change, (client, project_id))) =
2736 diff_bases_change.clone().zip(downstream_client)
2737 {
2738 let (staged_text, committed_text, mode) = match diff_bases_change {
2739 DiffBasesChange::SetIndex(index) => {
2740 (index, None, Mode::IndexOnly)
2741 }
2742 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
2743 DiffBasesChange::SetEach { index, head } => {
2744 (index, head, Mode::IndexAndHead)
2745 }
2746 DiffBasesChange::SetBoth(text) => {
2747 (None, text, Mode::IndexMatchesHead)
2748 }
2749 };
2750 client
2751 .send(proto::UpdateDiffBases {
2752 project_id: project_id.to_proto(),
2753 buffer_id: buffer_id.to_proto(),
2754 staged_text,
2755 committed_text,
2756 mode: mode as i32,
2757 })
2758 .log_err();
2759 }
2760
2761 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
2762 });
2763 }
2764 })
2765 },
2766 );
2767 }
2768
2769 pub fn send_job<F, Fut, R>(
2770 &mut self,
2771 status: Option<SharedString>,
2772 job: F,
2773 ) -> oneshot::Receiver<R>
2774 where
2775 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
2776 Fut: Future<Output = R> + 'static,
2777 R: Send + 'static,
2778 {
2779 self.send_keyed_job(None, status, job)
2780 }
2781
2782 fn send_keyed_job<F, Fut, R>(
2783 &mut self,
2784 key: Option<GitJobKey>,
2785 status: Option<SharedString>,
2786 job: F,
2787 ) -> oneshot::Receiver<R>
2788 where
2789 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
2790 Fut: Future<Output = R> + 'static,
2791 R: Send + 'static,
2792 {
2793 let (result_tx, result_rx) = futures::channel::oneshot::channel();
2794 let job_id = post_inc(&mut self.job_id);
2795 let this = self.this.clone();
2796 self.job_sender
2797 .unbounded_send(GitJob {
2798 key,
2799 job: Box::new(move |state, cx: &mut AsyncApp| {
2800 let job = job(state, cx.clone());
2801 cx.spawn(async move |cx| {
2802 if let Some(s) = status.clone() {
2803 this.update(cx, |this, cx| {
2804 this.active_jobs.insert(
2805 job_id,
2806 JobInfo {
2807 start: Instant::now(),
2808 message: s.clone(),
2809 },
2810 );
2811
2812 cx.notify();
2813 })
2814 .ok();
2815 }
2816 let result = job.await;
2817
2818 this.update(cx, |this, cx| {
2819 this.active_jobs.remove(&job_id);
2820 cx.notify();
2821 })
2822 .ok();
2823
2824 result_tx.send(result).ok();
2825 })
2826 }),
2827 })
2828 .ok();
2829 result_rx
2830 }
2831
2832 pub fn set_as_active_repository(&self, cx: &mut Context<Self>) {
2833 let Some(git_store) = self.git_store.upgrade() else {
2834 return;
2835 };
2836 let entity = cx.entity();
2837 git_store.update(cx, |git_store, cx| {
2838 let Some((&id, _)) = git_store
2839 .repositories
2840 .iter()
2841 .find(|(_, handle)| *handle == &entity)
2842 else {
2843 return;
2844 };
2845 git_store.active_repo_id = Some(id);
2846 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
2847 });
2848 }
2849
2850 pub fn cached_status(&self) -> impl '_ + Iterator<Item = StatusEntry> {
2851 self.snapshot.status()
2852 }
2853
2854 pub fn repo_path_to_project_path(&self, path: &RepoPath, cx: &App) -> Option<ProjectPath> {
2855 let git_store = self.git_store.upgrade()?;
2856 let worktree_store = git_store.read(cx).worktree_store.read(cx);
2857 let abs_path = self.snapshot.work_directory_abs_path.join(&path.0);
2858 let (worktree, relative_path) = worktree_store.find_worktree(abs_path, cx)?;
2859 Some(ProjectPath {
2860 worktree_id: worktree.read(cx).id(),
2861 path: relative_path.into(),
2862 })
2863 }
2864
2865 pub fn project_path_to_repo_path(&self, path: &ProjectPath, cx: &App) -> Option<RepoPath> {
2866 let git_store = self.git_store.upgrade()?;
2867 let worktree_store = git_store.read(cx).worktree_store.read(cx);
2868 let abs_path = worktree_store.absolutize(path, cx)?;
2869 self.snapshot.abs_path_to_repo_path(&abs_path)
2870 }
2871
2872 pub fn contains_sub_repo(&self, other: &Entity<Self>, cx: &App) -> bool {
2873 other
2874 .read(cx)
2875 .snapshot
2876 .work_directory_abs_path
2877 .starts_with(&self.snapshot.work_directory_abs_path)
2878 }
2879
2880 pub fn open_commit_buffer(
2881 &mut self,
2882 languages: Option<Arc<LanguageRegistry>>,
2883 buffer_store: Entity<BufferStore>,
2884 cx: &mut Context<Self>,
2885 ) -> Task<Result<Entity<Buffer>>> {
2886 let id = self.id;
2887 if let Some(buffer) = self.commit_message_buffer.clone() {
2888 return Task::ready(Ok(buffer));
2889 }
2890 let this = cx.weak_entity();
2891
2892 let rx = self.send_job(None, move |state, mut cx| async move {
2893 let Some(this) = this.upgrade() else {
2894 bail!("git store was dropped");
2895 };
2896 match state {
2897 RepositoryState::Local { .. } => {
2898 this.update(&mut cx, |_, cx| {
2899 Self::open_local_commit_buffer(languages, buffer_store, cx)
2900 })?
2901 .await
2902 }
2903 RepositoryState::Remote { project_id, client } => {
2904 let request = client.request(proto::OpenCommitMessageBuffer {
2905 project_id: project_id.0,
2906 repository_id: id.to_proto(),
2907 });
2908 let response = request.await.context("requesting to open commit buffer")?;
2909 let buffer_id = BufferId::new(response.buffer_id)?;
2910 let buffer = buffer_store
2911 .update(&mut cx, |buffer_store, cx| {
2912 buffer_store.wait_for_remote_buffer(buffer_id, cx)
2913 })?
2914 .await?;
2915 if let Some(language_registry) = languages {
2916 let git_commit_language =
2917 language_registry.language_for_name("Git Commit").await?;
2918 buffer.update(&mut cx, |buffer, cx| {
2919 buffer.set_language(Some(git_commit_language), cx);
2920 })?;
2921 }
2922 this.update(&mut cx, |this, _| {
2923 this.commit_message_buffer = Some(buffer.clone());
2924 })?;
2925 Ok(buffer)
2926 }
2927 }
2928 });
2929
2930 cx.spawn(|_, _: &mut AsyncApp| async move { rx.await? })
2931 }
2932
2933 fn open_local_commit_buffer(
2934 language_registry: Option<Arc<LanguageRegistry>>,
2935 buffer_store: Entity<BufferStore>,
2936 cx: &mut Context<Self>,
2937 ) -> Task<Result<Entity<Buffer>>> {
2938 cx.spawn(async move |repository, cx| {
2939 let buffer = buffer_store
2940 .update(cx, |buffer_store, cx| buffer_store.create_buffer(cx))?
2941 .await?;
2942
2943 if let Some(language_registry) = language_registry {
2944 let git_commit_language = language_registry.language_for_name("Git Commit").await?;
2945 buffer.update(cx, |buffer, cx| {
2946 buffer.set_language(Some(git_commit_language), cx);
2947 })?;
2948 }
2949
2950 repository.update(cx, |repository, _| {
2951 repository.commit_message_buffer = Some(buffer.clone());
2952 })?;
2953 Ok(buffer)
2954 })
2955 }
2956
2957 pub fn checkout_files(
2958 &mut self,
2959 commit: &str,
2960 paths: Vec<RepoPath>,
2961 _cx: &mut App,
2962 ) -> oneshot::Receiver<Result<()>> {
2963 let commit = commit.to_string();
2964 let id = self.id;
2965
2966 self.send_job(
2967 Some(format!("git checkout {}", commit).into()),
2968 move |git_repo, _| async move {
2969 match git_repo {
2970 RepositoryState::Local {
2971 backend,
2972 environment,
2973 ..
2974 } => {
2975 backend
2976 .checkout_files(commit, paths, environment.clone())
2977 .await
2978 }
2979 RepositoryState::Remote { project_id, client } => {
2980 client
2981 .request(proto::GitCheckoutFiles {
2982 project_id: project_id.0,
2983 repository_id: id.to_proto(),
2984 commit,
2985 paths: paths
2986 .into_iter()
2987 .map(|p| p.to_string_lossy().to_string())
2988 .collect(),
2989 })
2990 .await?;
2991
2992 Ok(())
2993 }
2994 }
2995 },
2996 )
2997 }
2998
2999 pub fn reset(
3000 &mut self,
3001 commit: String,
3002 reset_mode: ResetMode,
3003 _cx: &mut App,
3004 ) -> oneshot::Receiver<Result<()>> {
3005 let commit = commit.to_string();
3006 let id = self.id;
3007
3008 self.send_job(None, move |git_repo, _| async move {
3009 match git_repo {
3010 RepositoryState::Local {
3011 backend,
3012 environment,
3013 ..
3014 } => backend.reset(commit, reset_mode, environment).await,
3015 RepositoryState::Remote { project_id, client } => {
3016 client
3017 .request(proto::GitReset {
3018 project_id: project_id.0,
3019 repository_id: id.to_proto(),
3020 commit,
3021 mode: match reset_mode {
3022 ResetMode::Soft => git_reset::ResetMode::Soft.into(),
3023 ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
3024 },
3025 })
3026 .await?;
3027
3028 Ok(())
3029 }
3030 }
3031 })
3032 }
3033
3034 pub fn show(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDetails>> {
3035 let id = self.id;
3036 self.send_job(None, move |git_repo, _cx| async move {
3037 match git_repo {
3038 RepositoryState::Local { backend, .. } => backend.show(commit).await,
3039 RepositoryState::Remote { project_id, client } => {
3040 let resp = client
3041 .request(proto::GitShow {
3042 project_id: project_id.0,
3043 repository_id: id.to_proto(),
3044 commit,
3045 })
3046 .await?;
3047
3048 Ok(CommitDetails {
3049 sha: resp.sha.into(),
3050 message: resp.message.into(),
3051 commit_timestamp: resp.commit_timestamp,
3052 author_email: resp.author_email.into(),
3053 author_name: resp.author_name.into(),
3054 })
3055 }
3056 }
3057 })
3058 }
3059
3060 pub fn load_commit_diff(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
3061 let id = self.id;
3062 self.send_job(None, move |git_repo, cx| async move {
3063 match git_repo {
3064 RepositoryState::Local { backend, .. } => backend.load_commit(commit, cx).await,
3065 RepositoryState::Remote {
3066 client, project_id, ..
3067 } => {
3068 let response = client
3069 .request(proto::LoadCommitDiff {
3070 project_id: project_id.0,
3071 repository_id: id.to_proto(),
3072 commit,
3073 })
3074 .await?;
3075 Ok(CommitDiff {
3076 files: response
3077 .files
3078 .into_iter()
3079 .map(|file| CommitFile {
3080 path: Path::new(&file.path).into(),
3081 old_text: file.old_text,
3082 new_text: file.new_text,
3083 })
3084 .collect(),
3085 })
3086 }
3087 }
3088 })
3089 }
3090
3091 fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
3092 Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
3093 }
3094
3095 pub fn stage_entries(
3096 &self,
3097 entries: Vec<RepoPath>,
3098 cx: &mut Context<Self>,
3099 ) -> Task<anyhow::Result<()>> {
3100 if entries.is_empty() {
3101 return Task::ready(Ok(()));
3102 }
3103 let id = self.id;
3104
3105 let mut save_futures = Vec::new();
3106 if let Some(buffer_store) = self.buffer_store(cx) {
3107 buffer_store.update(cx, |buffer_store, cx| {
3108 for path in &entries {
3109 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3110 continue;
3111 };
3112 if let Some(buffer) = buffer_store.get_by_path(&project_path, cx) {
3113 if buffer
3114 .read(cx)
3115 .file()
3116 .map_or(false, |file| file.disk_state().exists())
3117 {
3118 save_futures.push(buffer_store.save_buffer(buffer, cx));
3119 }
3120 }
3121 }
3122 })
3123 }
3124
3125 cx.spawn(async move |this, cx| {
3126 for save_future in save_futures {
3127 save_future.await?;
3128 }
3129
3130 this.update(cx, |this, _| {
3131 this.send_job(None, move |git_repo, _cx| async move {
3132 match git_repo {
3133 RepositoryState::Local {
3134 backend,
3135 environment,
3136 ..
3137 } => backend.stage_paths(entries, environment.clone()).await,
3138 RepositoryState::Remote { project_id, client } => {
3139 client
3140 .request(proto::Stage {
3141 project_id: project_id.0,
3142 repository_id: id.to_proto(),
3143 paths: entries
3144 .into_iter()
3145 .map(|repo_path| repo_path.as_ref().to_proto())
3146 .collect(),
3147 })
3148 .await
3149 .context("sending stage request")?;
3150
3151 Ok(())
3152 }
3153 }
3154 })
3155 })?
3156 .await??;
3157
3158 Ok(())
3159 })
3160 }
3161
3162 pub fn unstage_entries(
3163 &self,
3164 entries: Vec<RepoPath>,
3165 cx: &mut Context<Self>,
3166 ) -> Task<anyhow::Result<()>> {
3167 if entries.is_empty() {
3168 return Task::ready(Ok(()));
3169 }
3170 let id = self.id;
3171
3172 let mut save_futures = Vec::new();
3173 if let Some(buffer_store) = self.buffer_store(cx) {
3174 buffer_store.update(cx, |buffer_store, cx| {
3175 for path in &entries {
3176 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
3177 continue;
3178 };
3179 if let Some(buffer) = buffer_store.get_by_path(&project_path, cx) {
3180 if buffer
3181 .read(cx)
3182 .file()
3183 .map_or(false, |file| file.disk_state().exists())
3184 {
3185 save_futures.push(buffer_store.save_buffer(buffer, cx));
3186 }
3187 }
3188 }
3189 })
3190 }
3191
3192 cx.spawn(async move |this, cx| {
3193 for save_future in save_futures {
3194 save_future.await?;
3195 }
3196
3197 this.update(cx, |this, _| {
3198 this.send_job(None, move |git_repo, _cx| async move {
3199 match git_repo {
3200 RepositoryState::Local {
3201 backend,
3202 environment,
3203 ..
3204 } => backend.unstage_paths(entries, environment).await,
3205 RepositoryState::Remote { project_id, client } => {
3206 client
3207 .request(proto::Unstage {
3208 project_id: project_id.0,
3209 repository_id: id.to_proto(),
3210 paths: entries
3211 .into_iter()
3212 .map(|repo_path| repo_path.as_ref().to_proto())
3213 .collect(),
3214 })
3215 .await
3216 .context("sending unstage request")?;
3217
3218 Ok(())
3219 }
3220 }
3221 })
3222 })?
3223 .await??;
3224
3225 Ok(())
3226 })
3227 }
3228
3229 pub fn stage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3230 let to_stage = self
3231 .cached_status()
3232 .filter(|entry| !entry.status.staging().is_fully_staged())
3233 .map(|entry| entry.repo_path.clone())
3234 .collect();
3235 self.stage_entries(to_stage, cx)
3236 }
3237
3238 pub fn unstage_all(&self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
3239 let to_unstage = self
3240 .cached_status()
3241 .filter(|entry| entry.status.staging().has_staged())
3242 .map(|entry| entry.repo_path.clone())
3243 .collect();
3244 self.unstage_entries(to_unstage, cx)
3245 }
3246
3247 pub fn commit(
3248 &mut self,
3249 message: SharedString,
3250 name_and_email: Option<(SharedString, SharedString)>,
3251 _cx: &mut App,
3252 ) -> oneshot::Receiver<Result<()>> {
3253 let id = self.id;
3254
3255 self.send_job(Some("git commit".into()), move |git_repo, _cx| async move {
3256 match git_repo {
3257 RepositoryState::Local {
3258 backend,
3259 environment,
3260 ..
3261 } => backend.commit(message, name_and_email, environment).await,
3262 RepositoryState::Remote { project_id, client } => {
3263 let (name, email) = name_and_email.unzip();
3264 client
3265 .request(proto::Commit {
3266 project_id: project_id.0,
3267 repository_id: id.to_proto(),
3268 message: String::from(message),
3269 name: name.map(String::from),
3270 email: email.map(String::from),
3271 })
3272 .await
3273 .context("sending commit request")?;
3274
3275 Ok(())
3276 }
3277 }
3278 })
3279 }
3280
3281 pub fn fetch(
3282 &mut self,
3283 askpass: AskPassDelegate,
3284 _cx: &mut App,
3285 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3286 let askpass_delegates = self.askpass_delegates.clone();
3287 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3288 let id = self.id;
3289
3290 self.send_job(Some("git fetch".into()), move |git_repo, cx| async move {
3291 match git_repo {
3292 RepositoryState::Local {
3293 backend,
3294 environment,
3295 ..
3296 } => backend.fetch(askpass, environment, cx).await,
3297 RepositoryState::Remote { project_id, client } => {
3298 askpass_delegates.lock().insert(askpass_id, askpass);
3299 let _defer = util::defer(|| {
3300 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3301 debug_assert!(askpass_delegate.is_some());
3302 });
3303
3304 let response = client
3305 .request(proto::Fetch {
3306 project_id: project_id.0,
3307 repository_id: id.to_proto(),
3308 askpass_id,
3309 })
3310 .await
3311 .context("sending fetch request")?;
3312
3313 Ok(RemoteCommandOutput {
3314 stdout: response.stdout,
3315 stderr: response.stderr,
3316 })
3317 }
3318 }
3319 })
3320 }
3321
3322 pub fn push(
3323 &mut self,
3324 branch: SharedString,
3325 remote: SharedString,
3326 options: Option<PushOptions>,
3327 askpass: AskPassDelegate,
3328 cx: &mut Context<Self>,
3329 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3330 let askpass_delegates = self.askpass_delegates.clone();
3331 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3332 let id = self.id;
3333
3334 let args = options
3335 .map(|option| match option {
3336 PushOptions::SetUpstream => " --set-upstream",
3337 PushOptions::Force => " --force",
3338 })
3339 .unwrap_or("");
3340
3341 let updates_tx = self
3342 .git_store()
3343 .and_then(|git_store| match &git_store.read(cx).state {
3344 GitStoreState::Local { downstream, .. } => downstream
3345 .as_ref()
3346 .map(|downstream| downstream.updates_tx.clone()),
3347 _ => None,
3348 });
3349
3350 let this = cx.weak_entity();
3351 self.send_job(
3352 Some(format!("git push {} {} {}", args, branch, remote).into()),
3353 move |git_repo, mut cx| async move {
3354 match git_repo {
3355 RepositoryState::Local {
3356 backend,
3357 environment,
3358 ..
3359 } => {
3360 let result = backend
3361 .push(
3362 branch.to_string(),
3363 remote.to_string(),
3364 options,
3365 askpass,
3366 environment.clone(),
3367 cx.clone(),
3368 )
3369 .await;
3370 if result.is_ok() {
3371 let branches = backend.branches().await?;
3372 let branch = branches.into_iter().find(|branch| branch.is_head);
3373 log::info!("head branch after scan is {branch:?}");
3374 let snapshot = this.update(&mut cx, |this, cx| {
3375 this.snapshot.branch = branch;
3376 let snapshot = this.snapshot.clone();
3377 cx.emit(RepositoryEvent::Updated { full_scan: false });
3378 snapshot
3379 })?;
3380 if let Some(updates_tx) = updates_tx {
3381 updates_tx
3382 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
3383 .ok();
3384 }
3385 }
3386 result
3387 }
3388 RepositoryState::Remote { project_id, client } => {
3389 askpass_delegates.lock().insert(askpass_id, askpass);
3390 let _defer = util::defer(|| {
3391 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3392 debug_assert!(askpass_delegate.is_some());
3393 });
3394 let response = client
3395 .request(proto::Push {
3396 project_id: project_id.0,
3397 repository_id: id.to_proto(),
3398 askpass_id,
3399 branch_name: branch.to_string(),
3400 remote_name: remote.to_string(),
3401 options: options.map(|options| match options {
3402 PushOptions::Force => proto::push::PushOptions::Force,
3403 PushOptions::SetUpstream => {
3404 proto::push::PushOptions::SetUpstream
3405 }
3406 }
3407 as i32),
3408 })
3409 .await
3410 .context("sending push request")?;
3411
3412 Ok(RemoteCommandOutput {
3413 stdout: response.stdout,
3414 stderr: response.stderr,
3415 })
3416 }
3417 }
3418 },
3419 )
3420 }
3421
3422 pub fn pull(
3423 &mut self,
3424 branch: SharedString,
3425 remote: SharedString,
3426 askpass: AskPassDelegate,
3427 _cx: &mut App,
3428 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
3429 let askpass_delegates = self.askpass_delegates.clone();
3430 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
3431 let id = self.id;
3432
3433 self.send_job(
3434 Some(format!("git pull {} {}", remote, branch).into()),
3435 move |git_repo, cx| async move {
3436 match git_repo {
3437 RepositoryState::Local {
3438 backend,
3439 environment,
3440 ..
3441 } => {
3442 backend
3443 .pull(
3444 branch.to_string(),
3445 remote.to_string(),
3446 askpass,
3447 environment.clone(),
3448 cx,
3449 )
3450 .await
3451 }
3452 RepositoryState::Remote { project_id, client } => {
3453 askpass_delegates.lock().insert(askpass_id, askpass);
3454 let _defer = util::defer(|| {
3455 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
3456 debug_assert!(askpass_delegate.is_some());
3457 });
3458 let response = client
3459 .request(proto::Pull {
3460 project_id: project_id.0,
3461 repository_id: id.to_proto(),
3462 askpass_id,
3463 branch_name: branch.to_string(),
3464 remote_name: remote.to_string(),
3465 })
3466 .await
3467 .context("sending pull request")?;
3468
3469 Ok(RemoteCommandOutput {
3470 stdout: response.stdout,
3471 stderr: response.stderr,
3472 })
3473 }
3474 }
3475 },
3476 )
3477 }
3478
3479 fn spawn_set_index_text_job(
3480 &mut self,
3481 path: RepoPath,
3482 content: Option<String>,
3483 hunk_staging_operation_count: Option<usize>,
3484 cx: &mut Context<Self>,
3485 ) -> oneshot::Receiver<anyhow::Result<()>> {
3486 let id = self.id;
3487 let this = cx.weak_entity();
3488 let git_store = self.git_store.clone();
3489 self.send_keyed_job(
3490 Some(GitJobKey::WriteIndex(path.clone())),
3491 None,
3492 move |git_repo, mut cx| async move {
3493 log::debug!("start updating index text for buffer {}", path.display());
3494 match git_repo {
3495 RepositoryState::Local {
3496 backend,
3497 environment,
3498 ..
3499 } => {
3500 backend
3501 .set_index_text(path.clone(), content, environment.clone())
3502 .await?;
3503 }
3504 RepositoryState::Remote { project_id, client } => {
3505 client
3506 .request(proto::SetIndexText {
3507 project_id: project_id.0,
3508 repository_id: id.to_proto(),
3509 path: path.as_ref().to_proto(),
3510 text: content,
3511 })
3512 .await?;
3513 }
3514 }
3515 log::debug!("finish updating index text for buffer {}", path.display());
3516
3517 if let Some(hunk_staging_operation_count) = hunk_staging_operation_count {
3518 let project_path = this
3519 .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
3520 .ok()
3521 .flatten();
3522 git_store.update(&mut cx, |git_store, cx| {
3523 let buffer_id = git_store
3524 .buffer_store
3525 .read(cx)
3526 .get_by_path(&project_path?, cx)?
3527 .read(cx)
3528 .remote_id();
3529 let diff_state = git_store.diffs.get(&buffer_id)?;
3530 diff_state.update(cx, |diff_state, _| {
3531 diff_state.hunk_staging_operation_count_as_of_write =
3532 hunk_staging_operation_count;
3533 });
3534 Some(())
3535 })?;
3536 }
3537 Ok(())
3538 },
3539 )
3540 }
3541
3542 pub fn get_remotes(
3543 &mut self,
3544 branch_name: Option<String>,
3545 ) -> oneshot::Receiver<Result<Vec<Remote>>> {
3546 let id = self.id;
3547 self.send_job(None, move |repo, _cx| async move {
3548 match repo {
3549 RepositoryState::Local { backend, .. } => backend.get_remotes(branch_name).await,
3550 RepositoryState::Remote { project_id, client } => {
3551 let response = client
3552 .request(proto::GetRemotes {
3553 project_id: project_id.0,
3554 repository_id: id.to_proto(),
3555 branch_name,
3556 })
3557 .await?;
3558
3559 let remotes = response
3560 .remotes
3561 .into_iter()
3562 .map(|remotes| git::repository::Remote {
3563 name: remotes.name.into(),
3564 })
3565 .collect();
3566
3567 Ok(remotes)
3568 }
3569 }
3570 })
3571 }
3572
3573 pub fn branches(&mut self) -> oneshot::Receiver<Result<Vec<Branch>>> {
3574 let id = self.id;
3575 self.send_job(None, move |repo, cx| async move {
3576 match repo {
3577 RepositoryState::Local { backend, .. } => {
3578 let backend = backend.clone();
3579 cx.background_spawn(async move { backend.branches().await })
3580 .await
3581 }
3582 RepositoryState::Remote { project_id, client } => {
3583 let response = client
3584 .request(proto::GitGetBranches {
3585 project_id: project_id.0,
3586 repository_id: id.to_proto(),
3587 })
3588 .await?;
3589
3590 let branches = response
3591 .branches
3592 .into_iter()
3593 .map(|branch| proto_to_branch(&branch))
3594 .collect();
3595
3596 Ok(branches)
3597 }
3598 }
3599 })
3600 }
3601
3602 pub fn diff(&mut self, diff_type: DiffType, _cx: &App) -> oneshot::Receiver<Result<String>> {
3603 let id = self.id;
3604 self.send_job(None, move |repo, _cx| async move {
3605 match repo {
3606 RepositoryState::Local { backend, .. } => backend.diff(diff_type).await,
3607 RepositoryState::Remote { project_id, client } => {
3608 let response = client
3609 .request(proto::GitDiff {
3610 project_id: project_id.0,
3611 repository_id: id.to_proto(),
3612 diff_type: match diff_type {
3613 DiffType::HeadToIndex => {
3614 proto::git_diff::DiffType::HeadToIndex.into()
3615 }
3616 DiffType::HeadToWorktree => {
3617 proto::git_diff::DiffType::HeadToWorktree.into()
3618 }
3619 },
3620 })
3621 .await?;
3622
3623 Ok(response.diff)
3624 }
3625 }
3626 })
3627 }
3628
3629 pub fn create_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3630 let id = self.id;
3631 self.send_job(
3632 Some(format!("git switch -c {branch_name}").into()),
3633 move |repo, _cx| async move {
3634 match repo {
3635 RepositoryState::Local { backend, .. } => {
3636 backend.create_branch(branch_name).await
3637 }
3638 RepositoryState::Remote { project_id, client } => {
3639 client
3640 .request(proto::GitCreateBranch {
3641 project_id: project_id.0,
3642 repository_id: id.to_proto(),
3643 branch_name,
3644 })
3645 .await?;
3646
3647 Ok(())
3648 }
3649 }
3650 },
3651 )
3652 }
3653
3654 pub fn change_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
3655 let id = self.id;
3656 self.send_job(
3657 Some(format!("git switch {branch_name}").into()),
3658 move |repo, _cx| async move {
3659 match repo {
3660 RepositoryState::Local { backend, .. } => {
3661 backend.change_branch(branch_name).await
3662 }
3663 RepositoryState::Remote { project_id, client } => {
3664 client
3665 .request(proto::GitChangeBranch {
3666 project_id: project_id.0,
3667 repository_id: id.to_proto(),
3668 branch_name,
3669 })
3670 .await?;
3671
3672 Ok(())
3673 }
3674 }
3675 },
3676 )
3677 }
3678
3679 pub fn check_for_pushed_commits(&mut self) -> oneshot::Receiver<Result<Vec<SharedString>>> {
3680 let id = self.id;
3681 self.send_job(None, move |repo, _cx| async move {
3682 match repo {
3683 RepositoryState::Local { backend, .. } => backend.check_for_pushed_commit().await,
3684 RepositoryState::Remote { project_id, client } => {
3685 let response = client
3686 .request(proto::CheckForPushedCommits {
3687 project_id: project_id.0,
3688 repository_id: id.to_proto(),
3689 })
3690 .await?;
3691
3692 let branches = response.pushed_to.into_iter().map(Into::into).collect();
3693
3694 Ok(branches)
3695 }
3696 }
3697 })
3698 }
3699
3700 pub fn checkpoint(&mut self) -> oneshot::Receiver<Result<GitRepositoryCheckpoint>> {
3701 self.send_job(None, |repo, _cx| async move {
3702 match repo {
3703 RepositoryState::Local { backend, .. } => backend.checkpoint().await,
3704 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3705 }
3706 })
3707 }
3708
3709 pub fn restore_checkpoint(
3710 &mut self,
3711 checkpoint: GitRepositoryCheckpoint,
3712 ) -> oneshot::Receiver<Result<()>> {
3713 self.send_job(None, move |repo, _cx| async move {
3714 match repo {
3715 RepositoryState::Local { backend, .. } => {
3716 backend.restore_checkpoint(checkpoint).await
3717 }
3718 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3719 }
3720 })
3721 }
3722
3723 pub(crate) fn apply_remote_update(
3724 &mut self,
3725 update: proto::UpdateRepository,
3726 cx: &mut Context<Self>,
3727 ) -> Result<()> {
3728 let conflicted_paths = TreeSet::from_ordered_entries(
3729 update
3730 .current_merge_conflicts
3731 .into_iter()
3732 .map(|path| RepoPath(Path::new(&path).into())),
3733 );
3734 self.snapshot.branch = update.branch_summary.as_ref().map(proto_to_branch);
3735 self.snapshot.merge_conflicts = conflicted_paths;
3736
3737 let edits = update
3738 .removed_statuses
3739 .into_iter()
3740 .map(|path| sum_tree::Edit::Remove(PathKey(FromProto::from_proto(path))))
3741 .chain(
3742 update
3743 .updated_statuses
3744 .into_iter()
3745 .filter_map(|updated_status| {
3746 Some(sum_tree::Edit::Insert(updated_status.try_into().log_err()?))
3747 }),
3748 )
3749 .collect::<Vec<_>>();
3750 self.snapshot.statuses_by_path.edit(edits, &());
3751 if update.is_last_update {
3752 self.snapshot.scan_id = update.scan_id;
3753 }
3754 cx.emit(RepositoryEvent::Updated { full_scan: true });
3755 Ok(())
3756 }
3757
3758 pub fn compare_checkpoints(
3759 &mut self,
3760 left: GitRepositoryCheckpoint,
3761 right: GitRepositoryCheckpoint,
3762 ) -> oneshot::Receiver<Result<bool>> {
3763 self.send_job(None, move |repo, _cx| async move {
3764 match repo {
3765 RepositoryState::Local { backend, .. } => {
3766 backend.compare_checkpoints(left, right).await
3767 }
3768 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3769 }
3770 })
3771 }
3772
3773 pub fn delete_checkpoint(
3774 &mut self,
3775 checkpoint: GitRepositoryCheckpoint,
3776 ) -> oneshot::Receiver<Result<()>> {
3777 self.send_job(None, move |repo, _cx| async move {
3778 match repo {
3779 RepositoryState::Local { backend, .. } => {
3780 backend.delete_checkpoint(checkpoint).await
3781 }
3782 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3783 }
3784 })
3785 }
3786
3787 pub fn diff_checkpoints(
3788 &mut self,
3789 base_checkpoint: GitRepositoryCheckpoint,
3790 target_checkpoint: GitRepositoryCheckpoint,
3791 ) -> oneshot::Receiver<Result<String>> {
3792 self.send_job(None, move |repo, _cx| async move {
3793 match repo {
3794 RepositoryState::Local { backend, .. } => {
3795 backend
3796 .diff_checkpoints(base_checkpoint, target_checkpoint)
3797 .await
3798 }
3799 RepositoryState::Remote { .. } => Err(anyhow!("not implemented yet")),
3800 }
3801 })
3802 }
3803
3804 fn schedule_scan(
3805 &mut self,
3806 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
3807 cx: &mut Context<Self>,
3808 ) {
3809 let this = cx.weak_entity();
3810 let _ = self.send_keyed_job(
3811 Some(GitJobKey::ReloadGitState),
3812 None,
3813 |state, mut cx| async move {
3814 let Some(this) = this.upgrade() else {
3815 return Ok(());
3816 };
3817 let RepositoryState::Local { backend, .. } = state else {
3818 bail!("not a local repository")
3819 };
3820 let (snapshot, events) = this
3821 .update(&mut cx, |this, _| {
3822 compute_snapshot(
3823 this.id,
3824 this.work_directory_abs_path.clone(),
3825 this.snapshot.clone(),
3826 backend.clone(),
3827 )
3828 })?
3829 .await?;
3830 this.update(&mut cx, |this, cx| {
3831 this.snapshot = snapshot.clone();
3832 for event in events {
3833 cx.emit(event);
3834 }
3835 })?;
3836 if let Some(updates_tx) = updates_tx {
3837 updates_tx
3838 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
3839 .ok();
3840 }
3841 Ok(())
3842 },
3843 );
3844 }
3845
3846 fn spawn_local_git_worker(
3847 work_directory_abs_path: Arc<Path>,
3848 dot_git_abs_path: Arc<Path>,
3849 repository_dir_abs_path: Arc<Path>,
3850 common_dir_abs_path: Arc<Path>,
3851 project_environment: WeakEntity<ProjectEnvironment>,
3852 fs: Arc<dyn Fs>,
3853 cx: &mut Context<Self>,
3854 ) -> mpsc::UnboundedSender<GitJob> {
3855 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
3856
3857 cx.spawn(async move |_, cx| {
3858 let environment = project_environment
3859 .upgrade()
3860 .ok_or_else(|| anyhow!("missing project environment"))?
3861 .update(cx, |project_environment, cx| {
3862 project_environment.get_directory_environment(work_directory_abs_path.clone(), cx)
3863 })?
3864 .await
3865 .unwrap_or_else(|| {
3866 log::error!("failed to get working directory environment for repository {work_directory_abs_path:?}");
3867 HashMap::default()
3868 });
3869 let backend = cx
3870 .background_spawn(async move {
3871 fs.open_repo(&dot_git_abs_path)
3872 .ok_or_else(|| anyhow!("failed to build repository"))
3873 })
3874 .await?;
3875
3876 debug_assert_eq!(backend.path().as_path(), repository_dir_abs_path.as_ref());
3877 debug_assert_eq!(backend.main_repository_path().as_path(), common_dir_abs_path.as_ref());
3878
3879 if let Some(git_hosting_provider_registry) =
3880 cx.update(|cx| GitHostingProviderRegistry::try_global(cx))?
3881 {
3882 git_hosting_providers::register_additional_providers(
3883 git_hosting_provider_registry,
3884 backend.clone(),
3885 );
3886 }
3887
3888 let state = RepositoryState::Local {
3889 backend,
3890 environment: Arc::new(environment),
3891 };
3892 let mut jobs = VecDeque::new();
3893 loop {
3894 while let Ok(Some(next_job)) = job_rx.try_next() {
3895 jobs.push_back(next_job);
3896 }
3897
3898 if let Some(job) = jobs.pop_front() {
3899 if let Some(current_key) = &job.key {
3900 if jobs
3901 .iter()
3902 .any(|other_job| other_job.key.as_ref() == Some(current_key))
3903 {
3904 continue;
3905 }
3906 }
3907 (job.job)(state.clone(), cx).await;
3908 } else if let Some(job) = job_rx.next().await {
3909 jobs.push_back(job);
3910 } else {
3911 break;
3912 }
3913 }
3914 anyhow::Ok(())
3915 })
3916 .detach_and_log_err(cx);
3917
3918 job_tx
3919 }
3920
3921 fn spawn_remote_git_worker(
3922 project_id: ProjectId,
3923 client: AnyProtoClient,
3924 cx: &mut Context<Self>,
3925 ) -> mpsc::UnboundedSender<GitJob> {
3926 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
3927
3928 cx.spawn(async move |_, cx| {
3929 let state = RepositoryState::Remote { project_id, client };
3930 let mut jobs = VecDeque::new();
3931 loop {
3932 while let Ok(Some(next_job)) = job_rx.try_next() {
3933 jobs.push_back(next_job);
3934 }
3935
3936 if let Some(job) = jobs.pop_front() {
3937 if let Some(current_key) = &job.key {
3938 if jobs
3939 .iter()
3940 .any(|other_job| other_job.key.as_ref() == Some(current_key))
3941 {
3942 continue;
3943 }
3944 }
3945 (job.job)(state.clone(), cx).await;
3946 } else if let Some(job) = job_rx.next().await {
3947 jobs.push_back(job);
3948 } else {
3949 break;
3950 }
3951 }
3952 anyhow::Ok(())
3953 })
3954 .detach_and_log_err(cx);
3955
3956 job_tx
3957 }
3958
3959 fn load_staged_text(
3960 &mut self,
3961 buffer_id: BufferId,
3962 repo_path: RepoPath,
3963 cx: &App,
3964 ) -> Task<Result<Option<String>>> {
3965 let rx = self.send_job(None, move |state, _| async move {
3966 match state {
3967 RepositoryState::Local { backend, .. } => {
3968 anyhow::Ok(backend.load_index_text(repo_path).await)
3969 }
3970 RepositoryState::Remote { project_id, client } => {
3971 let response = client
3972 .request(proto::OpenUnstagedDiff {
3973 project_id: project_id.to_proto(),
3974 buffer_id: buffer_id.to_proto(),
3975 })
3976 .await?;
3977 Ok(response.staged_text)
3978 }
3979 }
3980 });
3981 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
3982 }
3983
3984 fn load_committed_text(
3985 &mut self,
3986 buffer_id: BufferId,
3987 repo_path: RepoPath,
3988 cx: &App,
3989 ) -> Task<Result<DiffBasesChange>> {
3990 let rx = self.send_job(None, move |state, _| async move {
3991 match state {
3992 RepositoryState::Local { backend, .. } => {
3993 let committed_text = backend.load_committed_text(repo_path.clone()).await;
3994 let staged_text = backend.load_index_text(repo_path).await;
3995 let diff_bases_change = if committed_text == staged_text {
3996 DiffBasesChange::SetBoth(committed_text)
3997 } else {
3998 DiffBasesChange::SetEach {
3999 index: staged_text,
4000 head: committed_text,
4001 }
4002 };
4003 anyhow::Ok(diff_bases_change)
4004 }
4005 RepositoryState::Remote { project_id, client } => {
4006 use proto::open_uncommitted_diff_response::Mode;
4007
4008 let response = client
4009 .request(proto::OpenUncommittedDiff {
4010 project_id: project_id.to_proto(),
4011 buffer_id: buffer_id.to_proto(),
4012 })
4013 .await?;
4014 let mode =
4015 Mode::from_i32(response.mode).ok_or_else(|| anyhow!("Invalid mode"))?;
4016 let bases = match mode {
4017 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
4018 Mode::IndexAndHead => DiffBasesChange::SetEach {
4019 head: response.committed_text,
4020 index: response.staged_text,
4021 },
4022 };
4023 Ok(bases)
4024 }
4025 }
4026 });
4027
4028 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
4029 }
4030
4031 fn paths_changed(
4032 &mut self,
4033 paths: Vec<RepoPath>,
4034 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
4035 cx: &mut Context<Self>,
4036 ) {
4037 self.paths_needing_status_update.extend(paths);
4038
4039 let this = cx.weak_entity();
4040 let _ = self.send_keyed_job(
4041 Some(GitJobKey::RefreshStatuses),
4042 None,
4043 |state, mut cx| async move {
4044 let (prev_snapshot, mut changed_paths) = this.update(&mut cx, |this, _| {
4045 (
4046 this.snapshot.clone(),
4047 mem::take(&mut this.paths_needing_status_update),
4048 )
4049 })?;
4050 let RepositoryState::Local { backend, .. } = state else {
4051 bail!("not a local repository")
4052 };
4053
4054 let paths = changed_paths.iter().cloned().collect::<Vec<_>>();
4055 let statuses = backend.status(&paths).await?;
4056
4057 let changed_path_statuses = cx
4058 .background_spawn(async move {
4059 let mut changed_path_statuses = Vec::new();
4060 let prev_statuses = prev_snapshot.statuses_by_path.clone();
4061 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4062
4063 for (repo_path, status) in &*statuses.entries {
4064 changed_paths.remove(repo_path);
4065 if cursor.seek_forward(&PathTarget::Path(repo_path), Bias::Left, &()) {
4066 if cursor.item().is_some_and(|entry| entry.status == *status) {
4067 continue;
4068 }
4069 }
4070
4071 changed_path_statuses.push(Edit::Insert(StatusEntry {
4072 repo_path: repo_path.clone(),
4073 status: *status,
4074 }));
4075 }
4076 let mut cursor = prev_statuses.cursor::<PathProgress>(&());
4077 for path in changed_paths.iter() {
4078 if cursor.seek_forward(&PathTarget::Path(&path), Bias::Left, &()) {
4079 changed_path_statuses.push(Edit::Remove(PathKey(path.0.clone())));
4080 }
4081 }
4082 changed_path_statuses
4083 })
4084 .await;
4085
4086 this.update(&mut cx, |this, cx| {
4087 if !changed_path_statuses.is_empty() {
4088 this.snapshot
4089 .statuses_by_path
4090 .edit(changed_path_statuses, &());
4091 this.snapshot.scan_id += 1;
4092 if let Some(updates_tx) = updates_tx {
4093 updates_tx
4094 .unbounded_send(DownstreamUpdate::UpdateRepository(
4095 this.snapshot.clone(),
4096 ))
4097 .ok();
4098 }
4099 }
4100 cx.emit(RepositoryEvent::Updated { full_scan: false });
4101 })
4102 },
4103 );
4104 }
4105
4106 /// currently running git command and when it started
4107 pub fn current_job(&self) -> Option<JobInfo> {
4108 self.active_jobs.values().next().cloned()
4109 }
4110
4111 pub fn barrier(&mut self) -> oneshot::Receiver<()> {
4112 self.send_job(None, |_, _| async {})
4113 }
4114}
4115
4116fn get_permalink_in_rust_registry_src(
4117 provider_registry: Arc<GitHostingProviderRegistry>,
4118 path: PathBuf,
4119 selection: Range<u32>,
4120) -> Result<url::Url> {
4121 #[derive(Deserialize)]
4122 struct CargoVcsGit {
4123 sha1: String,
4124 }
4125
4126 #[derive(Deserialize)]
4127 struct CargoVcsInfo {
4128 git: CargoVcsGit,
4129 path_in_vcs: String,
4130 }
4131
4132 #[derive(Deserialize)]
4133 struct CargoPackage {
4134 repository: String,
4135 }
4136
4137 #[derive(Deserialize)]
4138 struct CargoToml {
4139 package: CargoPackage,
4140 }
4141
4142 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
4143 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
4144 Some((dir, json))
4145 }) else {
4146 bail!("No .cargo_vcs_info.json found in parent directories")
4147 };
4148 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
4149 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
4150 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
4151 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
4152 .ok_or_else(|| anyhow!("Failed to parse package.repository field of manifest"))?;
4153 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
4154 let permalink = provider.build_permalink(
4155 remote,
4156 BuildPermalinkParams {
4157 sha: &cargo_vcs_info.git.sha1,
4158 path: &path.to_string_lossy(),
4159 selection: Some(selection),
4160 },
4161 );
4162 Ok(permalink)
4163}
4164
4165fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
4166 let Some(blame) = blame else {
4167 return proto::BlameBufferResponse {
4168 blame_response: None,
4169 };
4170 };
4171
4172 let entries = blame
4173 .entries
4174 .into_iter()
4175 .map(|entry| proto::BlameEntry {
4176 sha: entry.sha.as_bytes().into(),
4177 start_line: entry.range.start,
4178 end_line: entry.range.end,
4179 original_line_number: entry.original_line_number,
4180 author: entry.author.clone(),
4181 author_mail: entry.author_mail.clone(),
4182 author_time: entry.author_time,
4183 author_tz: entry.author_tz.clone(),
4184 committer: entry.committer_name.clone(),
4185 committer_mail: entry.committer_email.clone(),
4186 committer_time: entry.committer_time,
4187 committer_tz: entry.committer_tz.clone(),
4188 summary: entry.summary.clone(),
4189 previous: entry.previous.clone(),
4190 filename: entry.filename.clone(),
4191 })
4192 .collect::<Vec<_>>();
4193
4194 let messages = blame
4195 .messages
4196 .into_iter()
4197 .map(|(oid, message)| proto::CommitMessage {
4198 oid: oid.as_bytes().into(),
4199 message,
4200 })
4201 .collect::<Vec<_>>();
4202
4203 proto::BlameBufferResponse {
4204 blame_response: Some(proto::blame_buffer_response::BlameResponse {
4205 entries,
4206 messages,
4207 remote_url: blame.remote_url,
4208 }),
4209 }
4210}
4211
4212fn deserialize_blame_buffer_response(
4213 response: proto::BlameBufferResponse,
4214) -> Option<git::blame::Blame> {
4215 let response = response.blame_response?;
4216 let entries = response
4217 .entries
4218 .into_iter()
4219 .filter_map(|entry| {
4220 Some(git::blame::BlameEntry {
4221 sha: git::Oid::from_bytes(&entry.sha).ok()?,
4222 range: entry.start_line..entry.end_line,
4223 original_line_number: entry.original_line_number,
4224 committer_name: entry.committer,
4225 committer_time: entry.committer_time,
4226 committer_tz: entry.committer_tz,
4227 committer_email: entry.committer_mail,
4228 author: entry.author,
4229 author_mail: entry.author_mail,
4230 author_time: entry.author_time,
4231 author_tz: entry.author_tz,
4232 summary: entry.summary,
4233 previous: entry.previous,
4234 filename: entry.filename,
4235 })
4236 })
4237 .collect::<Vec<_>>();
4238
4239 let messages = response
4240 .messages
4241 .into_iter()
4242 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
4243 .collect::<HashMap<_, _>>();
4244
4245 Some(Blame {
4246 entries,
4247 messages,
4248 remote_url: response.remote_url,
4249 })
4250}
4251
4252fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
4253 proto::Branch {
4254 is_head: branch.is_head,
4255 name: branch.name.to_string(),
4256 unix_timestamp: branch
4257 .most_recent_commit
4258 .as_ref()
4259 .map(|commit| commit.commit_timestamp as u64),
4260 upstream: branch.upstream.as_ref().map(|upstream| proto::GitUpstream {
4261 ref_name: upstream.ref_name.to_string(),
4262 tracking: upstream
4263 .tracking
4264 .status()
4265 .map(|upstream| proto::UpstreamTracking {
4266 ahead: upstream.ahead as u64,
4267 behind: upstream.behind as u64,
4268 }),
4269 }),
4270 most_recent_commit: branch
4271 .most_recent_commit
4272 .as_ref()
4273 .map(|commit| proto::CommitSummary {
4274 sha: commit.sha.to_string(),
4275 subject: commit.subject.to_string(),
4276 commit_timestamp: commit.commit_timestamp,
4277 }),
4278 }
4279}
4280
4281fn proto_to_branch(proto: &proto::Branch) -> git::repository::Branch {
4282 git::repository::Branch {
4283 is_head: proto.is_head,
4284 name: proto.name.clone().into(),
4285 upstream: proto
4286 .upstream
4287 .as_ref()
4288 .map(|upstream| git::repository::Upstream {
4289 ref_name: upstream.ref_name.to_string().into(),
4290 tracking: upstream
4291 .tracking
4292 .as_ref()
4293 .map(|tracking| {
4294 git::repository::UpstreamTracking::Tracked(UpstreamTrackingStatus {
4295 ahead: tracking.ahead as u32,
4296 behind: tracking.behind as u32,
4297 })
4298 })
4299 .unwrap_or(git::repository::UpstreamTracking::Gone),
4300 }),
4301 most_recent_commit: proto.most_recent_commit.as_ref().map(|commit| {
4302 git::repository::CommitSummary {
4303 sha: commit.sha.to_string().into(),
4304 subject: commit.subject.to_string().into(),
4305 commit_timestamp: commit.commit_timestamp,
4306 has_parent: true,
4307 }
4308 }),
4309 }
4310}
4311
4312async fn compute_snapshot(
4313 id: RepositoryId,
4314 work_directory_abs_path: Arc<Path>,
4315 prev_snapshot: RepositorySnapshot,
4316 backend: Arc<dyn GitRepository>,
4317) -> Result<(RepositorySnapshot, Vec<RepositoryEvent>)> {
4318 let mut events = Vec::new();
4319 let branches = backend.branches().await?;
4320 let branch = branches.into_iter().find(|branch| branch.is_head);
4321 let statuses = backend.status(&[WORK_DIRECTORY_REPO_PATH.clone()]).await?;
4322 let merge_message = backend
4323 .merge_message()
4324 .await
4325 .and_then(|msg| Some(msg.lines().nth(0)?.to_owned().into()));
4326 let merge_head_shas = backend
4327 .merge_head_shas()
4328 .into_iter()
4329 .map(SharedString::from)
4330 .collect();
4331
4332 let statuses_by_path = SumTree::from_iter(
4333 statuses
4334 .entries
4335 .iter()
4336 .map(|(repo_path, status)| StatusEntry {
4337 repo_path: repo_path.clone(),
4338 status: *status,
4339 }),
4340 &(),
4341 );
4342
4343 let merge_head_shas_changed = merge_head_shas != prev_snapshot.merge_head_shas;
4344
4345 if merge_head_shas_changed
4346 || branch != prev_snapshot.branch
4347 || statuses_by_path != prev_snapshot.statuses_by_path
4348 {
4349 events.push(RepositoryEvent::Updated { full_scan: true });
4350 }
4351
4352 let mut current_merge_conflicts = TreeSet::default();
4353 for (repo_path, status) in statuses.entries.iter() {
4354 if status.is_conflicted() {
4355 current_merge_conflicts.insert(repo_path.clone());
4356 }
4357 }
4358
4359 // Cache merge conflict paths so they don't change from staging/unstaging,
4360 // until the merge heads change (at commit time, etc.).
4361 let mut merge_conflicts = prev_snapshot.merge_conflicts.clone();
4362 if merge_head_shas_changed {
4363 merge_conflicts = current_merge_conflicts;
4364 events.push(RepositoryEvent::MergeHeadsChanged);
4365 }
4366
4367 let snapshot = RepositorySnapshot {
4368 id,
4369 merge_message,
4370 statuses_by_path,
4371 work_directory_abs_path,
4372 scan_id: prev_snapshot.scan_id + 1,
4373 branch,
4374 merge_conflicts,
4375 merge_head_shas,
4376 };
4377
4378 Ok((snapshot, events))
4379}
4380
4381fn status_from_proto(
4382 simple_status: i32,
4383 status: Option<proto::GitFileStatus>,
4384) -> anyhow::Result<FileStatus> {
4385 use proto::git_file_status::Variant;
4386
4387 let Some(variant) = status.and_then(|status| status.variant) else {
4388 let code = proto::GitStatus::from_i32(simple_status)
4389 .ok_or_else(|| anyhow!("Invalid git status code: {simple_status}"))?;
4390 let result = match code {
4391 proto::GitStatus::Added => TrackedStatus {
4392 worktree_status: StatusCode::Added,
4393 index_status: StatusCode::Unmodified,
4394 }
4395 .into(),
4396 proto::GitStatus::Modified => TrackedStatus {
4397 worktree_status: StatusCode::Modified,
4398 index_status: StatusCode::Unmodified,
4399 }
4400 .into(),
4401 proto::GitStatus::Conflict => UnmergedStatus {
4402 first_head: UnmergedStatusCode::Updated,
4403 second_head: UnmergedStatusCode::Updated,
4404 }
4405 .into(),
4406 proto::GitStatus::Deleted => TrackedStatus {
4407 worktree_status: StatusCode::Deleted,
4408 index_status: StatusCode::Unmodified,
4409 }
4410 .into(),
4411 _ => return Err(anyhow!("Invalid code for simple status: {simple_status}")),
4412 };
4413 return Ok(result);
4414 };
4415
4416 let result = match variant {
4417 Variant::Untracked(_) => FileStatus::Untracked,
4418 Variant::Ignored(_) => FileStatus::Ignored,
4419 Variant::Unmerged(unmerged) => {
4420 let [first_head, second_head] =
4421 [unmerged.first_head, unmerged.second_head].map(|head| {
4422 let code = proto::GitStatus::from_i32(head)
4423 .ok_or_else(|| anyhow!("Invalid git status code: {head}"))?;
4424 let result = match code {
4425 proto::GitStatus::Added => UnmergedStatusCode::Added,
4426 proto::GitStatus::Updated => UnmergedStatusCode::Updated,
4427 proto::GitStatus::Deleted => UnmergedStatusCode::Deleted,
4428 _ => return Err(anyhow!("Invalid code for unmerged status: {code:?}")),
4429 };
4430 Ok(result)
4431 });
4432 let [first_head, second_head] = [first_head?, second_head?];
4433 UnmergedStatus {
4434 first_head,
4435 second_head,
4436 }
4437 .into()
4438 }
4439 Variant::Tracked(tracked) => {
4440 let [index_status, worktree_status] = [tracked.index_status, tracked.worktree_status]
4441 .map(|status| {
4442 let code = proto::GitStatus::from_i32(status)
4443 .ok_or_else(|| anyhow!("Invalid git status code: {status}"))?;
4444 let result = match code {
4445 proto::GitStatus::Modified => StatusCode::Modified,
4446 proto::GitStatus::TypeChanged => StatusCode::TypeChanged,
4447 proto::GitStatus::Added => StatusCode::Added,
4448 proto::GitStatus::Deleted => StatusCode::Deleted,
4449 proto::GitStatus::Renamed => StatusCode::Renamed,
4450 proto::GitStatus::Copied => StatusCode::Copied,
4451 proto::GitStatus::Unmodified => StatusCode::Unmodified,
4452 _ => return Err(anyhow!("Invalid code for tracked status: {code:?}")),
4453 };
4454 Ok(result)
4455 });
4456 let [index_status, worktree_status] = [index_status?, worktree_status?];
4457 TrackedStatus {
4458 index_status,
4459 worktree_status,
4460 }
4461 .into()
4462 }
4463 };
4464 Ok(result)
4465}
4466
4467fn status_to_proto(status: FileStatus) -> proto::GitFileStatus {
4468 use proto::git_file_status::{Tracked, Unmerged, Variant};
4469
4470 let variant = match status {
4471 FileStatus::Untracked => Variant::Untracked(Default::default()),
4472 FileStatus::Ignored => Variant::Ignored(Default::default()),
4473 FileStatus::Unmerged(UnmergedStatus {
4474 first_head,
4475 second_head,
4476 }) => Variant::Unmerged(Unmerged {
4477 first_head: unmerged_status_to_proto(first_head),
4478 second_head: unmerged_status_to_proto(second_head),
4479 }),
4480 FileStatus::Tracked(TrackedStatus {
4481 index_status,
4482 worktree_status,
4483 }) => Variant::Tracked(Tracked {
4484 index_status: tracked_status_to_proto(index_status),
4485 worktree_status: tracked_status_to_proto(worktree_status),
4486 }),
4487 };
4488 proto::GitFileStatus {
4489 variant: Some(variant),
4490 }
4491}
4492
4493fn unmerged_status_to_proto(code: UnmergedStatusCode) -> i32 {
4494 match code {
4495 UnmergedStatusCode::Added => proto::GitStatus::Added as _,
4496 UnmergedStatusCode::Deleted => proto::GitStatus::Deleted as _,
4497 UnmergedStatusCode::Updated => proto::GitStatus::Updated as _,
4498 }
4499}
4500
4501fn tracked_status_to_proto(code: StatusCode) -> i32 {
4502 match code {
4503 StatusCode::Added => proto::GitStatus::Added as _,
4504 StatusCode::Deleted => proto::GitStatus::Deleted as _,
4505 StatusCode::Modified => proto::GitStatus::Modified as _,
4506 StatusCode::Renamed => proto::GitStatus::Renamed as _,
4507 StatusCode::TypeChanged => proto::GitStatus::TypeChanged as _,
4508 StatusCode::Copied => proto::GitStatus::Copied as _,
4509 StatusCode::Unmodified => proto::GitStatus::Unmodified as _,
4510 }
4511}