1pub mod branch_diff;
2mod conflict_set;
3pub mod git_traversal;
4pub mod pending_op;
5
6use crate::{
7 ProjectEnvironment, ProjectItem, ProjectPath,
8 buffer_store::{BufferStore, BufferStoreEvent},
9 worktree_store::{WorktreeStore, WorktreeStoreEvent},
10};
11use anyhow::{Context as _, Result, anyhow, bail};
12use askpass::{AskPassDelegate, EncryptedPassword, IKnowWhatIAmDoingAndIHaveReadTheDocs};
13use buffer_diff::{BufferDiff, BufferDiffEvent};
14use client::ProjectId;
15use collections::HashMap;
16pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, ConflictSetUpdate};
17use fs::Fs;
18use futures::{
19 FutureExt, StreamExt,
20 channel::{
21 mpsc,
22 oneshot::{self, Canceled},
23 },
24 future::{self, Shared},
25 stream::FuturesOrdered,
26};
27use git::{
28 BuildPermalinkParams, GitHostingProviderRegistry, Oid, RunHook,
29 blame::Blame,
30 parse_git_remote_url,
31 repository::{
32 Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, DiffType, FetchOptions,
33 GitRepository, GitRepositoryCheckpoint, GraphCommitData, InitialGraphCommitData, LogOrder,
34 LogSource, PushOptions, Remote, RemoteCommandOutput, RepoPath, ResetMode,
35 UpstreamTrackingStatus, Worktree as GitWorktree,
36 },
37 stash::{GitStash, StashEntry},
38 status::{
39 DiffTreeType, FileStatus, GitSummary, StatusCode, TrackedStatus, TreeDiff, TreeDiffStatus,
40 UnmergedStatus, UnmergedStatusCode,
41 },
42};
43use gpui::{
44 App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
45 WeakEntity,
46};
47use language::{
48 Buffer, BufferEvent, Language, LanguageRegistry,
49 proto::{deserialize_version, serialize_version},
50};
51use parking_lot::Mutex;
52use pending_op::{PendingOp, PendingOpId, PendingOps, PendingOpsSummary};
53use postage::stream::Stream as _;
54use rpc::{
55 AnyProtoClient, TypedEnvelope,
56 proto::{self, git_reset, split_repository_update},
57};
58use serde::Deserialize;
59use settings::WorktreeId;
60use smol::future::yield_now;
61use std::{
62 cmp::Ordering,
63 collections::{BTreeSet, HashSet, VecDeque, hash_map::Entry},
64 future::Future,
65 mem,
66 ops::Range,
67 path::{Path, PathBuf},
68 str::FromStr,
69 sync::{
70 Arc,
71 atomic::{self, AtomicU64},
72 },
73 time::Instant,
74};
75use sum_tree::{Edit, SumTree, TreeSet};
76use task::Shell;
77use text::{Bias, BufferId};
78use util::{
79 ResultExt, debug_panic,
80 paths::{PathStyle, SanitizedPath},
81 post_inc,
82 rel_path::RelPath,
83};
84use worktree::{
85 File, PathChange, PathKey, PathProgress, PathSummary, PathTarget, ProjectEntryId,
86 UpdatedGitRepositoriesSet, UpdatedGitRepository, Worktree,
87};
88use zeroize::Zeroize;
89
90pub struct GitStore {
91 state: GitStoreState,
92 buffer_store: Entity<BufferStore>,
93 worktree_store: Entity<WorktreeStore>,
94 repositories: HashMap<RepositoryId, Entity<Repository>>,
95 worktree_ids: HashMap<RepositoryId, HashSet<WorktreeId>>,
96 active_repo_id: Option<RepositoryId>,
97 #[allow(clippy::type_complexity)]
98 loading_diffs:
99 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
100 diffs: HashMap<BufferId, Entity<BufferGitState>>,
101 shared_diffs: HashMap<proto::PeerId, HashMap<BufferId, SharedDiffs>>,
102 _subscriptions: Vec<Subscription>,
103}
104
105#[derive(Default)]
106struct SharedDiffs {
107 unstaged: Option<Entity<BufferDiff>>,
108 uncommitted: Option<Entity<BufferDiff>>,
109}
110
111struct BufferGitState {
112 unstaged_diff: Option<WeakEntity<BufferDiff>>,
113 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
114 oid_diffs: HashMap<Option<git::Oid>, WeakEntity<BufferDiff>>,
115 conflict_set: Option<WeakEntity<ConflictSet>>,
116 recalculate_diff_task: Option<Task<Result<()>>>,
117 reparse_conflict_markers_task: Option<Task<Result<()>>>,
118 language: Option<Arc<Language>>,
119 language_registry: Option<Arc<LanguageRegistry>>,
120 conflict_updated_futures: Vec<oneshot::Sender<()>>,
121 recalculating_tx: postage::watch::Sender<bool>,
122
123 /// These operation counts are used to ensure that head and index text
124 /// values read from the git repository are up-to-date with any hunk staging
125 /// operations that have been performed on the BufferDiff.
126 ///
127 /// The operation count is incremented immediately when the user initiates a
128 /// hunk stage/unstage operation. Then, upon finishing writing the new index
129 /// text do disk, the `operation count as of write` is updated to reflect
130 /// the operation count that prompted the write.
131 hunk_staging_operation_count: usize,
132 hunk_staging_operation_count_as_of_write: usize,
133
134 head_text: Option<Arc<str>>,
135 index_text: Option<Arc<str>>,
136 oid_texts: HashMap<git::Oid, Arc<str>>,
137 head_changed: bool,
138 index_changed: bool,
139 language_changed: bool,
140}
141
142#[derive(Clone, Debug)]
143enum DiffBasesChange {
144 SetIndex(Option<String>),
145 SetHead(Option<String>),
146 SetEach {
147 index: Option<String>,
148 head: Option<String>,
149 },
150 SetBoth(Option<String>),
151}
152
153#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
154enum DiffKind {
155 Unstaged,
156 Uncommitted,
157 SinceOid(Option<git::Oid>),
158}
159
160enum GitStoreState {
161 Local {
162 next_repository_id: Arc<AtomicU64>,
163 downstream: Option<LocalDownstreamState>,
164 project_environment: Entity<ProjectEnvironment>,
165 fs: Arc<dyn Fs>,
166 },
167 Remote {
168 upstream_client: AnyProtoClient,
169 upstream_project_id: u64,
170 downstream: Option<(AnyProtoClient, ProjectId)>,
171 },
172}
173
174enum DownstreamUpdate {
175 UpdateRepository(RepositorySnapshot),
176 RemoveRepository(RepositoryId),
177}
178
179struct LocalDownstreamState {
180 client: AnyProtoClient,
181 project_id: ProjectId,
182 updates_tx: mpsc::UnboundedSender<DownstreamUpdate>,
183 _task: Task<Result<()>>,
184}
185
186#[derive(Clone, Debug)]
187pub struct GitStoreCheckpoint {
188 checkpoints_by_work_dir_abs_path: HashMap<Arc<Path>, GitRepositoryCheckpoint>,
189}
190
191#[derive(Clone, Debug, PartialEq, Eq)]
192pub struct StatusEntry {
193 pub repo_path: RepoPath,
194 pub status: FileStatus,
195}
196
197impl StatusEntry {
198 fn to_proto(&self) -> proto::StatusEntry {
199 let simple_status = match self.status {
200 FileStatus::Ignored | FileStatus::Untracked => proto::GitStatus::Added as i32,
201 FileStatus::Unmerged { .. } => proto::GitStatus::Conflict as i32,
202 FileStatus::Tracked(TrackedStatus {
203 index_status,
204 worktree_status,
205 }) => tracked_status_to_proto(if worktree_status != StatusCode::Unmodified {
206 worktree_status
207 } else {
208 index_status
209 }),
210 };
211
212 proto::StatusEntry {
213 repo_path: self.repo_path.to_proto(),
214 simple_status,
215 status: Some(status_to_proto(self.status)),
216 }
217 }
218}
219
220impl TryFrom<proto::StatusEntry> for StatusEntry {
221 type Error = anyhow::Error;
222
223 fn try_from(value: proto::StatusEntry) -> Result<Self, Self::Error> {
224 let repo_path = RepoPath::from_proto(&value.repo_path).context("invalid repo path")?;
225 let status = status_from_proto(value.simple_status, value.status)?;
226 Ok(Self { repo_path, status })
227 }
228}
229
230impl sum_tree::Item for StatusEntry {
231 type Summary = PathSummary<GitSummary>;
232
233 fn summary(&self, _: <Self::Summary as sum_tree::Summary>::Context<'_>) -> Self::Summary {
234 PathSummary {
235 max_path: self.repo_path.as_ref().clone(),
236 item_summary: self.status.summary(),
237 }
238 }
239}
240
241impl sum_tree::KeyedItem for StatusEntry {
242 type Key = PathKey;
243
244 fn key(&self) -> Self::Key {
245 PathKey(self.repo_path.as_ref().clone())
246 }
247}
248
249#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
250pub struct RepositoryId(pub u64);
251
252#[derive(Clone, Debug, Default, PartialEq, Eq)]
253pub struct MergeDetails {
254 pub conflicted_paths: TreeSet<RepoPath>,
255 pub message: Option<SharedString>,
256 pub heads: Vec<Option<SharedString>>,
257}
258
259#[derive(Clone)]
260pub enum CommitDataState {
261 Loading,
262 Loaded(Arc<GraphCommitData>),
263}
264
265#[derive(Clone, Debug, PartialEq, Eq)]
266pub struct RepositorySnapshot {
267 pub id: RepositoryId,
268 pub statuses_by_path: SumTree<StatusEntry>,
269 pub work_directory_abs_path: Arc<Path>,
270 pub path_style: PathStyle,
271 pub branch: Option<Branch>,
272 pub head_commit: Option<CommitDetails>,
273 pub scan_id: u64,
274 pub merge: MergeDetails,
275 pub remote_origin_url: Option<String>,
276 pub remote_upstream_url: Option<String>,
277 pub stash_entries: GitStash,
278}
279
280type JobId = u64;
281
282#[derive(Clone, Debug, PartialEq, Eq)]
283pub struct JobInfo {
284 pub start: Instant,
285 pub message: SharedString,
286}
287
288struct GraphCommitDataHandler {
289 _task: Task<()>,
290 commit_data_request: smol::channel::Sender<Oid>,
291}
292
293enum GraphCommitHandlerState {
294 Starting,
295 Open(GraphCommitDataHandler),
296 Closed,
297}
298
299pub struct InitialGitGraphData {
300 fetch_task: Task<()>,
301 pub error: Option<SharedString>,
302 pub commit_data: Vec<Arc<InitialGraphCommitData>>,
303 pub commit_oid_to_index: HashMap<Oid, usize>,
304}
305
306pub struct GraphDataResponse<'a> {
307 pub commits: &'a [Arc<InitialGraphCommitData>],
308 pub is_loading: bool,
309 pub error: Option<SharedString>,
310}
311
312pub struct Repository {
313 this: WeakEntity<Self>,
314 snapshot: RepositorySnapshot,
315 commit_message_buffer: Option<Entity<Buffer>>,
316 git_store: WeakEntity<GitStore>,
317 // For a local repository, holds paths that have had worktree events since the last status scan completed,
318 // and that should be examined during the next status scan.
319 paths_needing_status_update: Vec<Vec<RepoPath>>,
320 job_sender: mpsc::UnboundedSender<GitJob>,
321 active_jobs: HashMap<JobId, JobInfo>,
322 pending_ops: SumTree<PendingOps>,
323 job_id: JobId,
324 askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
325 latest_askpass_id: u64,
326 repository_state: Shared<Task<Result<RepositoryState, String>>>,
327 initial_graph_data: HashMap<(LogSource, LogOrder), InitialGitGraphData>,
328 graph_commit_data_handler: GraphCommitHandlerState,
329 commit_data: HashMap<Oid, CommitDataState>,
330}
331
332impl std::ops::Deref for Repository {
333 type Target = RepositorySnapshot;
334
335 fn deref(&self) -> &Self::Target {
336 &self.snapshot
337 }
338}
339
340#[derive(Clone)]
341pub struct LocalRepositoryState {
342 pub fs: Arc<dyn Fs>,
343 pub backend: Arc<dyn GitRepository>,
344 pub environment: Arc<HashMap<String, String>>,
345}
346
347impl LocalRepositoryState {
348 async fn new(
349 work_directory_abs_path: Arc<Path>,
350 dot_git_abs_path: Arc<Path>,
351 project_environment: WeakEntity<ProjectEnvironment>,
352 fs: Arc<dyn Fs>,
353 cx: &mut AsyncApp,
354 ) -> anyhow::Result<Self> {
355 let environment = project_environment
356 .update(cx, |project_environment, cx| {
357 project_environment.local_directory_environment(&Shell::System, work_directory_abs_path.clone(), cx)
358 })?
359 .await
360 .unwrap_or_else(|| {
361 log::error!("failed to get working directory environment for repository {work_directory_abs_path:?}");
362 HashMap::default()
363 });
364 let search_paths = environment.get("PATH").map(|val| val.to_owned());
365 let backend = cx
366 .background_spawn({
367 let fs = fs.clone();
368 async move {
369 let system_git_binary_path = search_paths
370 .and_then(|search_paths| {
371 which::which_in("git", Some(search_paths), &work_directory_abs_path)
372 .ok()
373 })
374 .or_else(|| which::which("git").ok());
375 fs.open_repo(&dot_git_abs_path, system_git_binary_path.as_deref())
376 .with_context(|| format!("opening repository at {dot_git_abs_path:?}"))
377 }
378 })
379 .await?;
380 Ok(LocalRepositoryState {
381 backend,
382 environment: Arc::new(environment),
383 fs,
384 })
385 }
386}
387
388#[derive(Clone)]
389pub struct RemoteRepositoryState {
390 pub project_id: ProjectId,
391 pub client: AnyProtoClient,
392}
393
394#[derive(Clone)]
395pub enum RepositoryState {
396 Local(LocalRepositoryState),
397 Remote(RemoteRepositoryState),
398}
399
400#[derive(Clone, Debug, PartialEq, Eq)]
401pub enum GitGraphEvent {
402 CountUpdated(usize),
403 FullyLoaded,
404 LoadingError,
405}
406
407#[derive(Clone, Debug, PartialEq, Eq)]
408pub enum RepositoryEvent {
409 StatusesChanged,
410 MergeHeadsChanged,
411 BranchChanged,
412 StashEntriesChanged,
413 PendingOpsChanged { pending_ops: SumTree<PendingOps> },
414 GraphEvent((LogSource, LogOrder), GitGraphEvent),
415}
416
417#[derive(Clone, Debug)]
418pub struct JobsUpdated;
419
420#[derive(Debug)]
421pub enum GitStoreEvent {
422 ActiveRepositoryChanged(Option<RepositoryId>),
423 /// Bool is true when the repository that's updated is the active repository
424 RepositoryUpdated(RepositoryId, RepositoryEvent, bool),
425 RepositoryAdded,
426 RepositoryRemoved(RepositoryId),
427 IndexWriteError(anyhow::Error),
428 JobsUpdated,
429 ConflictsUpdated,
430}
431
432impl EventEmitter<RepositoryEvent> for Repository {}
433impl EventEmitter<JobsUpdated> for Repository {}
434impl EventEmitter<GitStoreEvent> for GitStore {}
435
436pub struct GitJob {
437 job: Box<dyn FnOnce(RepositoryState, &mut AsyncApp) -> Task<()>>,
438 key: Option<GitJobKey>,
439}
440
441#[derive(PartialEq, Eq)]
442enum GitJobKey {
443 WriteIndex(Vec<RepoPath>),
444 ReloadBufferDiffBases,
445 RefreshStatuses,
446 ReloadGitState,
447}
448
449impl GitStore {
450 pub fn local(
451 worktree_store: &Entity<WorktreeStore>,
452 buffer_store: Entity<BufferStore>,
453 environment: Entity<ProjectEnvironment>,
454 fs: Arc<dyn Fs>,
455 cx: &mut Context<Self>,
456 ) -> Self {
457 Self::new(
458 worktree_store.clone(),
459 buffer_store,
460 GitStoreState::Local {
461 next_repository_id: Arc::new(AtomicU64::new(1)),
462 downstream: None,
463 project_environment: environment,
464 fs,
465 },
466 cx,
467 )
468 }
469
470 pub fn remote(
471 worktree_store: &Entity<WorktreeStore>,
472 buffer_store: Entity<BufferStore>,
473 upstream_client: AnyProtoClient,
474 project_id: u64,
475 cx: &mut Context<Self>,
476 ) -> Self {
477 Self::new(
478 worktree_store.clone(),
479 buffer_store,
480 GitStoreState::Remote {
481 upstream_client,
482 upstream_project_id: project_id,
483 downstream: None,
484 },
485 cx,
486 )
487 }
488
489 fn new(
490 worktree_store: Entity<WorktreeStore>,
491 buffer_store: Entity<BufferStore>,
492 state: GitStoreState,
493 cx: &mut Context<Self>,
494 ) -> Self {
495 let _subscriptions = vec![
496 cx.subscribe(&worktree_store, Self::on_worktree_store_event),
497 cx.subscribe(&buffer_store, Self::on_buffer_store_event),
498 ];
499
500 GitStore {
501 state,
502 buffer_store,
503 worktree_store,
504 repositories: HashMap::default(),
505 worktree_ids: HashMap::default(),
506 active_repo_id: None,
507 _subscriptions,
508 loading_diffs: HashMap::default(),
509 shared_diffs: HashMap::default(),
510 diffs: HashMap::default(),
511 }
512 }
513
514 pub fn init(client: &AnyProtoClient) {
515 client.add_entity_request_handler(Self::handle_get_remotes);
516 client.add_entity_request_handler(Self::handle_get_branches);
517 client.add_entity_request_handler(Self::handle_get_default_branch);
518 client.add_entity_request_handler(Self::handle_change_branch);
519 client.add_entity_request_handler(Self::handle_create_branch);
520 client.add_entity_request_handler(Self::handle_rename_branch);
521 client.add_entity_request_handler(Self::handle_create_remote);
522 client.add_entity_request_handler(Self::handle_remove_remote);
523 client.add_entity_request_handler(Self::handle_delete_branch);
524 client.add_entity_request_handler(Self::handle_git_init);
525 client.add_entity_request_handler(Self::handle_push);
526 client.add_entity_request_handler(Self::handle_pull);
527 client.add_entity_request_handler(Self::handle_fetch);
528 client.add_entity_request_handler(Self::handle_stage);
529 client.add_entity_request_handler(Self::handle_unstage);
530 client.add_entity_request_handler(Self::handle_stash);
531 client.add_entity_request_handler(Self::handle_stash_pop);
532 client.add_entity_request_handler(Self::handle_stash_apply);
533 client.add_entity_request_handler(Self::handle_stash_drop);
534 client.add_entity_request_handler(Self::handle_commit);
535 client.add_entity_request_handler(Self::handle_run_hook);
536 client.add_entity_request_handler(Self::handle_reset);
537 client.add_entity_request_handler(Self::handle_show);
538 client.add_entity_request_handler(Self::handle_load_commit_diff);
539 client.add_entity_request_handler(Self::handle_file_history);
540 client.add_entity_request_handler(Self::handle_checkout_files);
541 client.add_entity_request_handler(Self::handle_open_commit_message_buffer);
542 client.add_entity_request_handler(Self::handle_set_index_text);
543 client.add_entity_request_handler(Self::handle_askpass);
544 client.add_entity_request_handler(Self::handle_check_for_pushed_commits);
545 client.add_entity_request_handler(Self::handle_git_diff);
546 client.add_entity_request_handler(Self::handle_git_diff_stat);
547 client.add_entity_request_handler(Self::handle_tree_diff);
548 client.add_entity_request_handler(Self::handle_get_blob_content);
549 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
550 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
551 client.add_entity_message_handler(Self::handle_update_diff_bases);
552 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
553 client.add_entity_request_handler(Self::handle_blame_buffer);
554 client.add_entity_message_handler(Self::handle_update_repository);
555 client.add_entity_message_handler(Self::handle_remove_repository);
556 client.add_entity_request_handler(Self::handle_git_clone);
557 client.add_entity_request_handler(Self::handle_get_worktrees);
558 client.add_entity_request_handler(Self::handle_create_worktree);
559 }
560
561 pub fn is_local(&self) -> bool {
562 matches!(self.state, GitStoreState::Local { .. })
563 }
564 pub fn set_active_repo_for_path(&mut self, project_path: &ProjectPath, cx: &mut Context<Self>) {
565 if let Some((repo, _)) = self.repository_and_path_for_project_path(project_path, cx) {
566 let id = repo.read(cx).id;
567 if self.active_repo_id != Some(id) {
568 self.active_repo_id = Some(id);
569 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
570 }
571 }
572 }
573
574 pub fn shared(&mut self, project_id: u64, client: AnyProtoClient, cx: &mut Context<Self>) {
575 match &mut self.state {
576 GitStoreState::Remote {
577 downstream: downstream_client,
578 ..
579 } => {
580 for repo in self.repositories.values() {
581 let update = repo.read(cx).snapshot.initial_update(project_id);
582 for update in split_repository_update(update) {
583 client.send(update).log_err();
584 }
585 }
586 *downstream_client = Some((client, ProjectId(project_id)));
587 }
588 GitStoreState::Local {
589 downstream: downstream_client,
590 ..
591 } => {
592 let mut snapshots = HashMap::default();
593 let (updates_tx, mut updates_rx) = mpsc::unbounded();
594 for repo in self.repositories.values() {
595 updates_tx
596 .unbounded_send(DownstreamUpdate::UpdateRepository(
597 repo.read(cx).snapshot.clone(),
598 ))
599 .ok();
600 }
601 *downstream_client = Some(LocalDownstreamState {
602 client: client.clone(),
603 project_id: ProjectId(project_id),
604 updates_tx,
605 _task: cx.spawn(async move |this, cx| {
606 cx.background_spawn(async move {
607 while let Some(update) = updates_rx.next().await {
608 match update {
609 DownstreamUpdate::UpdateRepository(snapshot) => {
610 if let Some(old_snapshot) = snapshots.get_mut(&snapshot.id)
611 {
612 let update =
613 snapshot.build_update(old_snapshot, project_id);
614 *old_snapshot = snapshot;
615 for update in split_repository_update(update) {
616 client.send(update)?;
617 }
618 } else {
619 let update = snapshot.initial_update(project_id);
620 for update in split_repository_update(update) {
621 client.send(update)?;
622 }
623 snapshots.insert(snapshot.id, snapshot);
624 }
625 }
626 DownstreamUpdate::RemoveRepository(id) => {
627 client.send(proto::RemoveRepository {
628 project_id,
629 id: id.to_proto(),
630 })?;
631 }
632 }
633 }
634 anyhow::Ok(())
635 })
636 .await
637 .ok();
638 this.update(cx, |this, _| {
639 if let GitStoreState::Local {
640 downstream: downstream_client,
641 ..
642 } = &mut this.state
643 {
644 downstream_client.take();
645 } else {
646 unreachable!("unshared called on remote store");
647 }
648 })
649 }),
650 });
651 }
652 }
653 }
654
655 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
656 match &mut self.state {
657 GitStoreState::Local {
658 downstream: downstream_client,
659 ..
660 } => {
661 downstream_client.take();
662 }
663 GitStoreState::Remote {
664 downstream: downstream_client,
665 ..
666 } => {
667 downstream_client.take();
668 }
669 }
670 self.shared_diffs.clear();
671 }
672
673 pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) {
674 self.shared_diffs.remove(peer_id);
675 }
676
677 pub fn active_repository(&self) -> Option<Entity<Repository>> {
678 self.active_repo_id
679 .as_ref()
680 .map(|id| self.repositories[id].clone())
681 }
682
683 pub fn open_unstaged_diff(
684 &mut self,
685 buffer: Entity<Buffer>,
686 cx: &mut Context<Self>,
687 ) -> Task<Result<Entity<BufferDiff>>> {
688 let buffer_id = buffer.read(cx).remote_id();
689 if let Some(diff_state) = self.diffs.get(&buffer_id)
690 && let Some(unstaged_diff) = diff_state
691 .read(cx)
692 .unstaged_diff
693 .as_ref()
694 .and_then(|weak| weak.upgrade())
695 {
696 if let Some(task) =
697 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
698 {
699 return cx.background_executor().spawn(async move {
700 task.await;
701 Ok(unstaged_diff)
702 });
703 }
704 return Task::ready(Ok(unstaged_diff));
705 }
706
707 let Some((repo, repo_path)) =
708 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
709 else {
710 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
711 };
712
713 let task = self
714 .loading_diffs
715 .entry((buffer_id, DiffKind::Unstaged))
716 .or_insert_with(|| {
717 let staged_text = repo.update(cx, |repo, cx| {
718 repo.load_staged_text(buffer_id, repo_path, cx)
719 });
720 cx.spawn(async move |this, cx| {
721 Self::open_diff_internal(
722 this,
723 DiffKind::Unstaged,
724 staged_text.await.map(DiffBasesChange::SetIndex),
725 buffer,
726 cx,
727 )
728 .await
729 .map_err(Arc::new)
730 })
731 .shared()
732 })
733 .clone();
734
735 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
736 }
737
738 pub fn open_diff_since(
739 &mut self,
740 oid: Option<git::Oid>,
741 buffer: Entity<Buffer>,
742 repo: Entity<Repository>,
743 cx: &mut Context<Self>,
744 ) -> Task<Result<Entity<BufferDiff>>> {
745 let buffer_id = buffer.read(cx).remote_id();
746
747 if let Some(diff_state) = self.diffs.get(&buffer_id)
748 && let Some(oid_diff) = diff_state.read(cx).oid_diff(oid)
749 {
750 if let Some(task) =
751 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
752 {
753 return cx.background_executor().spawn(async move {
754 task.await;
755 Ok(oid_diff)
756 });
757 }
758 return Task::ready(Ok(oid_diff));
759 }
760
761 let diff_kind = DiffKind::SinceOid(oid);
762 if let Some(task) = self.loading_diffs.get(&(buffer_id, diff_kind)) {
763 let task = task.clone();
764 return cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) });
765 }
766
767 let task = cx
768 .spawn(async move |this, cx| {
769 let result: Result<Entity<BufferDiff>> = async {
770 let buffer_snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
771 let language_registry =
772 buffer.update(cx, |buffer, _| buffer.language_registry());
773 let content: Option<Arc<str>> = match oid {
774 None => None,
775 Some(oid) => Some(
776 repo.update(cx, |repo, cx| repo.load_blob_content(oid, cx))
777 .await?
778 .into(),
779 ),
780 };
781 let buffer_diff = cx.new(|cx| BufferDiff::new(&buffer_snapshot, cx));
782
783 buffer_diff
784 .update(cx, |buffer_diff, cx| {
785 buffer_diff.language_changed(
786 buffer_snapshot.language().cloned(),
787 language_registry,
788 cx,
789 );
790 buffer_diff.set_base_text(
791 content.clone(),
792 buffer_snapshot.language().cloned(),
793 buffer_snapshot.text,
794 cx,
795 )
796 })
797 .await?;
798 let unstaged_diff = this
799 .update(cx, |this, cx| this.open_unstaged_diff(buffer.clone(), cx))?
800 .await?;
801 buffer_diff.update(cx, |buffer_diff, _| {
802 buffer_diff.set_secondary_diff(unstaged_diff);
803 });
804
805 this.update(cx, |this, cx| {
806 cx.subscribe(&buffer_diff, Self::on_buffer_diff_event)
807 .detach();
808
809 this.loading_diffs.remove(&(buffer_id, diff_kind));
810
811 let git_store = cx.weak_entity();
812 let diff_state = this
813 .diffs
814 .entry(buffer_id)
815 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
816
817 diff_state.update(cx, |state, _| {
818 if let Some(oid) = oid {
819 if let Some(content) = content {
820 state.oid_texts.insert(oid, content);
821 }
822 }
823 state.oid_diffs.insert(oid, buffer_diff.downgrade());
824 });
825 })?;
826
827 Ok(buffer_diff)
828 }
829 .await;
830 result.map_err(Arc::new)
831 })
832 .shared();
833
834 self.loading_diffs
835 .insert((buffer_id, diff_kind), task.clone());
836 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
837 }
838
839 #[ztracing::instrument(skip_all)]
840 pub fn open_uncommitted_diff(
841 &mut self,
842 buffer: Entity<Buffer>,
843 cx: &mut Context<Self>,
844 ) -> Task<Result<Entity<BufferDiff>>> {
845 let buffer_id = buffer.read(cx).remote_id();
846
847 if let Some(diff_state) = self.diffs.get(&buffer_id)
848 && let Some(uncommitted_diff) = diff_state
849 .read(cx)
850 .uncommitted_diff
851 .as_ref()
852 .and_then(|weak| weak.upgrade())
853 {
854 if let Some(task) =
855 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
856 {
857 return cx.background_executor().spawn(async move {
858 task.await;
859 Ok(uncommitted_diff)
860 });
861 }
862 return Task::ready(Ok(uncommitted_diff));
863 }
864
865 let Some((repo, repo_path)) =
866 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
867 else {
868 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
869 };
870
871 let task = self
872 .loading_diffs
873 .entry((buffer_id, DiffKind::Uncommitted))
874 .or_insert_with(|| {
875 let changes = repo.update(cx, |repo, cx| {
876 repo.load_committed_text(buffer_id, repo_path, cx)
877 });
878
879 // todo(lw): hot foreground spawn
880 cx.spawn(async move |this, cx| {
881 Self::open_diff_internal(this, DiffKind::Uncommitted, changes.await, buffer, cx)
882 .await
883 .map_err(Arc::new)
884 })
885 .shared()
886 })
887 .clone();
888
889 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
890 }
891
892 #[ztracing::instrument(skip_all)]
893 async fn open_diff_internal(
894 this: WeakEntity<Self>,
895 kind: DiffKind,
896 texts: Result<DiffBasesChange>,
897 buffer_entity: Entity<Buffer>,
898 cx: &mut AsyncApp,
899 ) -> Result<Entity<BufferDiff>> {
900 let diff_bases_change = match texts {
901 Err(e) => {
902 this.update(cx, |this, cx| {
903 let buffer = buffer_entity.read(cx);
904 let buffer_id = buffer.remote_id();
905 this.loading_diffs.remove(&(buffer_id, kind));
906 })?;
907 return Err(e);
908 }
909 Ok(change) => change,
910 };
911
912 this.update(cx, |this, cx| {
913 let buffer = buffer_entity.read(cx);
914 let buffer_id = buffer.remote_id();
915 let language = buffer.language().cloned();
916 let language_registry = buffer.language_registry();
917 let text_snapshot = buffer.text_snapshot();
918 this.loading_diffs.remove(&(buffer_id, kind));
919
920 let git_store = cx.weak_entity();
921 let diff_state = this
922 .diffs
923 .entry(buffer_id)
924 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
925
926 let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
927
928 cx.subscribe(&diff, Self::on_buffer_diff_event).detach();
929 diff_state.update(cx, |diff_state, cx| {
930 diff_state.language_changed = true;
931 diff_state.language = language;
932 diff_state.language_registry = language_registry;
933
934 match kind {
935 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
936 DiffKind::Uncommitted => {
937 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
938 diff
939 } else {
940 let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
941 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
942 unstaged_diff
943 };
944
945 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
946 diff_state.uncommitted_diff = Some(diff.downgrade())
947 }
948 DiffKind::SinceOid(_) => {
949 unreachable!("open_diff_internal is not used for OID diffs")
950 }
951 }
952
953 diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
954 let rx = diff_state.wait_for_recalculation();
955
956 anyhow::Ok(async move {
957 if let Some(rx) = rx {
958 rx.await;
959 }
960 Ok(diff)
961 })
962 })
963 })??
964 .await
965 }
966
967 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
968 let diff_state = self.diffs.get(&buffer_id)?;
969 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
970 }
971
972 pub fn get_uncommitted_diff(
973 &self,
974 buffer_id: BufferId,
975 cx: &App,
976 ) -> Option<Entity<BufferDiff>> {
977 let diff_state = self.diffs.get(&buffer_id)?;
978 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
979 }
980
981 pub fn get_diff_since_oid(
982 &self,
983 buffer_id: BufferId,
984 oid: Option<git::Oid>,
985 cx: &App,
986 ) -> Option<Entity<BufferDiff>> {
987 let diff_state = self.diffs.get(&buffer_id)?;
988 diff_state.read(cx).oid_diff(oid)
989 }
990
991 pub fn open_conflict_set(
992 &mut self,
993 buffer: Entity<Buffer>,
994 cx: &mut Context<Self>,
995 ) -> Entity<ConflictSet> {
996 log::debug!("open conflict set");
997 let buffer_id = buffer.read(cx).remote_id();
998
999 if let Some(git_state) = self.diffs.get(&buffer_id)
1000 && let Some(conflict_set) = git_state
1001 .read(cx)
1002 .conflict_set
1003 .as_ref()
1004 .and_then(|weak| weak.upgrade())
1005 {
1006 let conflict_set = conflict_set;
1007 let buffer_snapshot = buffer.read(cx).text_snapshot();
1008
1009 git_state.update(cx, |state, cx| {
1010 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
1011 });
1012
1013 return conflict_set;
1014 }
1015
1016 let is_unmerged = self
1017 .repository_and_path_for_buffer_id(buffer_id, cx)
1018 .is_some_and(|(repo, path)| repo.read(cx).snapshot.has_conflict(&path));
1019 let git_store = cx.weak_entity();
1020 let buffer_git_state = self
1021 .diffs
1022 .entry(buffer_id)
1023 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
1024 let conflict_set = cx.new(|cx| ConflictSet::new(buffer_id, is_unmerged, cx));
1025
1026 self._subscriptions
1027 .push(cx.subscribe(&conflict_set, |_, _, _, cx| {
1028 cx.emit(GitStoreEvent::ConflictsUpdated);
1029 }));
1030
1031 buffer_git_state.update(cx, |state, cx| {
1032 state.conflict_set = Some(conflict_set.downgrade());
1033 let buffer_snapshot = buffer.read(cx).text_snapshot();
1034 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
1035 });
1036
1037 conflict_set
1038 }
1039
1040 pub fn project_path_git_status(
1041 &self,
1042 project_path: &ProjectPath,
1043 cx: &App,
1044 ) -> Option<FileStatus> {
1045 let (repo, repo_path) = self.repository_and_path_for_project_path(project_path, cx)?;
1046 Some(repo.read(cx).status_for_path(&repo_path)?.status)
1047 }
1048
1049 pub fn checkpoint(&self, cx: &mut App) -> Task<Result<GitStoreCheckpoint>> {
1050 let mut work_directory_abs_paths = Vec::new();
1051 let mut checkpoints = Vec::new();
1052 for repository in self.repositories.values() {
1053 repository.update(cx, |repository, _| {
1054 work_directory_abs_paths.push(repository.snapshot.work_directory_abs_path.clone());
1055 checkpoints.push(repository.checkpoint().map(|checkpoint| checkpoint?));
1056 });
1057 }
1058
1059 cx.background_executor().spawn(async move {
1060 let checkpoints = future::try_join_all(checkpoints).await?;
1061 Ok(GitStoreCheckpoint {
1062 checkpoints_by_work_dir_abs_path: work_directory_abs_paths
1063 .into_iter()
1064 .zip(checkpoints)
1065 .collect(),
1066 })
1067 })
1068 }
1069
1070 pub fn restore_checkpoint(
1071 &self,
1072 checkpoint: GitStoreCheckpoint,
1073 cx: &mut App,
1074 ) -> Task<Result<()>> {
1075 let repositories_by_work_dir_abs_path = self
1076 .repositories
1077 .values()
1078 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
1079 .collect::<HashMap<_, _>>();
1080
1081 let mut tasks = Vec::new();
1082 for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
1083 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path) {
1084 let restore = repository.update(cx, |repository, _| {
1085 repository.restore_checkpoint(checkpoint)
1086 });
1087 tasks.push(async move { restore.await? });
1088 }
1089 }
1090 cx.background_spawn(async move {
1091 future::try_join_all(tasks).await?;
1092 Ok(())
1093 })
1094 }
1095
1096 /// Compares two checkpoints, returning true if they are equal.
1097 pub fn compare_checkpoints(
1098 &self,
1099 left: GitStoreCheckpoint,
1100 mut right: GitStoreCheckpoint,
1101 cx: &mut App,
1102 ) -> Task<Result<bool>> {
1103 let repositories_by_work_dir_abs_path = self
1104 .repositories
1105 .values()
1106 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
1107 .collect::<HashMap<_, _>>();
1108
1109 let mut tasks = Vec::new();
1110 for (work_dir_abs_path, left_checkpoint) in left.checkpoints_by_work_dir_abs_path {
1111 if let Some(right_checkpoint) = right
1112 .checkpoints_by_work_dir_abs_path
1113 .remove(&work_dir_abs_path)
1114 {
1115 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path)
1116 {
1117 let compare = repository.update(cx, |repository, _| {
1118 repository.compare_checkpoints(left_checkpoint, right_checkpoint)
1119 });
1120
1121 tasks.push(async move { compare.await? });
1122 }
1123 } else {
1124 return Task::ready(Ok(false));
1125 }
1126 }
1127 cx.background_spawn(async move {
1128 Ok(future::try_join_all(tasks)
1129 .await?
1130 .into_iter()
1131 .all(|result| result))
1132 })
1133 }
1134
1135 /// Blames a buffer.
1136 pub fn blame_buffer(
1137 &self,
1138 buffer: &Entity<Buffer>,
1139 version: Option<clock::Global>,
1140 cx: &mut Context<Self>,
1141 ) -> Task<Result<Option<Blame>>> {
1142 let buffer = buffer.read(cx);
1143 let Some((repo, repo_path)) =
1144 self.repository_and_path_for_buffer_id(buffer.remote_id(), cx)
1145 else {
1146 return Task::ready(Err(anyhow!("failed to find a git repository for buffer")));
1147 };
1148 let content = match &version {
1149 Some(version) => buffer.rope_for_version(version),
1150 None => buffer.as_rope().clone(),
1151 };
1152 let line_ending = buffer.line_ending();
1153 let version = version.unwrap_or(buffer.version());
1154 let buffer_id = buffer.remote_id();
1155
1156 let repo = repo.downgrade();
1157 cx.spawn(async move |_, cx| {
1158 let repository_state = repo
1159 .update(cx, |repo, _| repo.repository_state.clone())?
1160 .await
1161 .map_err(|err| anyhow::anyhow!(err))?;
1162 match repository_state {
1163 RepositoryState::Local(LocalRepositoryState { backend, .. }) => backend
1164 .blame(repo_path.clone(), content, line_ending)
1165 .await
1166 .with_context(|| format!("Failed to blame {:?}", repo_path.as_ref()))
1167 .map(Some),
1168 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
1169 let response = client
1170 .request(proto::BlameBuffer {
1171 project_id: project_id.to_proto(),
1172 buffer_id: buffer_id.into(),
1173 version: serialize_version(&version),
1174 })
1175 .await?;
1176 Ok(deserialize_blame_buffer_response(response))
1177 }
1178 }
1179 })
1180 }
1181
1182 pub fn file_history(
1183 &self,
1184 repo: &Entity<Repository>,
1185 path: RepoPath,
1186 cx: &mut App,
1187 ) -> Task<Result<git::repository::FileHistory>> {
1188 let rx = repo.update(cx, |repo, _| repo.file_history(path));
1189
1190 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1191 }
1192
1193 pub fn file_history_paginated(
1194 &self,
1195 repo: &Entity<Repository>,
1196 path: RepoPath,
1197 skip: usize,
1198 limit: Option<usize>,
1199 cx: &mut App,
1200 ) -> Task<Result<git::repository::FileHistory>> {
1201 let rx = repo.update(cx, |repo, _| repo.file_history_paginated(path, skip, limit));
1202
1203 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1204 }
1205
1206 pub fn get_permalink_to_line(
1207 &self,
1208 buffer: &Entity<Buffer>,
1209 selection: Range<u32>,
1210 cx: &mut App,
1211 ) -> Task<Result<url::Url>> {
1212 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1213 return Task::ready(Err(anyhow!("buffer has no file")));
1214 };
1215
1216 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
1217 &(file.worktree.read(cx).id(), file.path.clone()).into(),
1218 cx,
1219 ) else {
1220 // If we're not in a Git repo, check whether this is a Rust source
1221 // file in the Cargo registry (presumably opened with go-to-definition
1222 // from a normal Rust file). If so, we can put together a permalink
1223 // using crate metadata.
1224 if buffer
1225 .read(cx)
1226 .language()
1227 .is_none_or(|lang| lang.name() != "Rust")
1228 {
1229 return Task::ready(Err(anyhow!("no permalink available")));
1230 }
1231 let file_path = file.worktree.read(cx).absolutize(&file.path);
1232 return cx.spawn(async move |cx| {
1233 let provider_registry = cx.update(GitHostingProviderRegistry::default_global);
1234 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1235 .context("no permalink available")
1236 });
1237 };
1238
1239 let buffer_id = buffer.read(cx).remote_id();
1240 let branch = repo.read(cx).branch.clone();
1241 let remote = branch
1242 .as_ref()
1243 .and_then(|b| b.upstream.as_ref())
1244 .and_then(|b| b.remote_name())
1245 .unwrap_or("origin")
1246 .to_string();
1247
1248 let rx = repo.update(cx, |repo, _| {
1249 repo.send_job(None, move |state, cx| async move {
1250 match state {
1251 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
1252 let origin_url = backend
1253 .remote_url(&remote)
1254 .await
1255 .with_context(|| format!("remote \"{remote}\" not found"))?;
1256
1257 let sha = backend.head_sha().await.context("reading HEAD SHA")?;
1258
1259 let provider_registry =
1260 cx.update(GitHostingProviderRegistry::default_global);
1261
1262 let (provider, remote) =
1263 parse_git_remote_url(provider_registry, &origin_url)
1264 .context("parsing Git remote URL")?;
1265
1266 Ok(provider.build_permalink(
1267 remote,
1268 BuildPermalinkParams::new(&sha, &repo_path, Some(selection)),
1269 ))
1270 }
1271 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
1272 let response = client
1273 .request(proto::GetPermalinkToLine {
1274 project_id: project_id.to_proto(),
1275 buffer_id: buffer_id.into(),
1276 selection: Some(proto::Range {
1277 start: selection.start as u64,
1278 end: selection.end as u64,
1279 }),
1280 })
1281 .await?;
1282
1283 url::Url::parse(&response.permalink).context("failed to parse permalink")
1284 }
1285 }
1286 })
1287 });
1288 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1289 }
1290
1291 fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> {
1292 match &self.state {
1293 GitStoreState::Local {
1294 downstream: downstream_client,
1295 ..
1296 } => downstream_client
1297 .as_ref()
1298 .map(|state| (state.client.clone(), state.project_id)),
1299 GitStoreState::Remote {
1300 downstream: downstream_client,
1301 ..
1302 } => downstream_client.clone(),
1303 }
1304 }
1305
1306 fn upstream_client(&self) -> Option<AnyProtoClient> {
1307 match &self.state {
1308 GitStoreState::Local { .. } => None,
1309 GitStoreState::Remote {
1310 upstream_client, ..
1311 } => Some(upstream_client.clone()),
1312 }
1313 }
1314
1315 fn on_worktree_store_event(
1316 &mut self,
1317 worktree_store: Entity<WorktreeStore>,
1318 event: &WorktreeStoreEvent,
1319 cx: &mut Context<Self>,
1320 ) {
1321 let GitStoreState::Local {
1322 project_environment,
1323 downstream,
1324 next_repository_id,
1325 fs,
1326 } = &self.state
1327 else {
1328 return;
1329 };
1330
1331 match event {
1332 WorktreeStoreEvent::WorktreeUpdatedEntries(worktree_id, updated_entries) => {
1333 if let Some(worktree) = self
1334 .worktree_store
1335 .read(cx)
1336 .worktree_for_id(*worktree_id, cx)
1337 {
1338 let paths_by_git_repo =
1339 self.process_updated_entries(&worktree, updated_entries, cx);
1340 let downstream = downstream
1341 .as_ref()
1342 .map(|downstream| downstream.updates_tx.clone());
1343 cx.spawn(async move |_, cx| {
1344 let paths_by_git_repo = paths_by_git_repo.await;
1345 for (repo, paths) in paths_by_git_repo {
1346 repo.update(cx, |repo, cx| {
1347 repo.paths_changed(paths, downstream.clone(), cx);
1348 });
1349 }
1350 })
1351 .detach();
1352 }
1353 }
1354 WorktreeStoreEvent::WorktreeUpdatedGitRepositories(worktree_id, changed_repos) => {
1355 let Some(worktree) = worktree_store.read(cx).worktree_for_id(*worktree_id, cx)
1356 else {
1357 return;
1358 };
1359 if !worktree.read(cx).is_visible() {
1360 log::debug!(
1361 "not adding repositories for local worktree {:?} because it's not visible",
1362 worktree.read(cx).abs_path()
1363 );
1364 return;
1365 }
1366 self.update_repositories_from_worktree(
1367 *worktree_id,
1368 project_environment.clone(),
1369 next_repository_id.clone(),
1370 downstream
1371 .as_ref()
1372 .map(|downstream| downstream.updates_tx.clone()),
1373 changed_repos.clone(),
1374 fs.clone(),
1375 cx,
1376 );
1377 self.local_worktree_git_repos_changed(worktree, changed_repos, cx);
1378 }
1379 WorktreeStoreEvent::WorktreeRemoved(_entity_id, worktree_id) => {
1380 let repos_without_worktree: Vec<RepositoryId> = self
1381 .worktree_ids
1382 .iter_mut()
1383 .filter_map(|(repo_id, worktree_ids)| {
1384 worktree_ids.remove(worktree_id);
1385 if worktree_ids.is_empty() {
1386 Some(*repo_id)
1387 } else {
1388 None
1389 }
1390 })
1391 .collect();
1392 let is_active_repo_removed = repos_without_worktree
1393 .iter()
1394 .any(|repo_id| self.active_repo_id == Some(*repo_id));
1395
1396 for repo_id in repos_without_worktree {
1397 self.repositories.remove(&repo_id);
1398 self.worktree_ids.remove(&repo_id);
1399 if let Some(updates_tx) =
1400 downstream.as_ref().map(|downstream| &downstream.updates_tx)
1401 {
1402 updates_tx
1403 .unbounded_send(DownstreamUpdate::RemoveRepository(repo_id))
1404 .ok();
1405 }
1406 }
1407
1408 if is_active_repo_removed {
1409 if let Some((&repo_id, _)) = self.repositories.iter().next() {
1410 self.active_repo_id = Some(repo_id);
1411 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(repo_id)));
1412 } else {
1413 self.active_repo_id = None;
1414 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1415 }
1416 }
1417 }
1418 _ => {}
1419 }
1420 }
1421 fn on_repository_event(
1422 &mut self,
1423 repo: Entity<Repository>,
1424 event: &RepositoryEvent,
1425 cx: &mut Context<Self>,
1426 ) {
1427 let id = repo.read(cx).id;
1428 let repo_snapshot = repo.read(cx).snapshot.clone();
1429 for (buffer_id, diff) in self.diffs.iter() {
1430 if let Some((buffer_repo, repo_path)) =
1431 self.repository_and_path_for_buffer_id(*buffer_id, cx)
1432 && buffer_repo == repo
1433 {
1434 diff.update(cx, |diff, cx| {
1435 if let Some(conflict_set) = &diff.conflict_set {
1436 let conflict_status_changed =
1437 conflict_set.update(cx, |conflict_set, cx| {
1438 let has_conflict = repo_snapshot.has_conflict(&repo_path);
1439 conflict_set.set_has_conflict(has_conflict, cx)
1440 })?;
1441 if conflict_status_changed {
1442 let buffer_store = self.buffer_store.read(cx);
1443 if let Some(buffer) = buffer_store.get(*buffer_id) {
1444 let _ = diff
1445 .reparse_conflict_markers(buffer.read(cx).text_snapshot(), cx);
1446 }
1447 }
1448 }
1449 anyhow::Ok(())
1450 })
1451 .ok();
1452 }
1453 }
1454 cx.emit(GitStoreEvent::RepositoryUpdated(
1455 id,
1456 event.clone(),
1457 self.active_repo_id == Some(id),
1458 ))
1459 }
1460
1461 fn on_jobs_updated(&mut self, _: Entity<Repository>, _: &JobsUpdated, cx: &mut Context<Self>) {
1462 cx.emit(GitStoreEvent::JobsUpdated)
1463 }
1464
1465 /// Update our list of repositories and schedule git scans in response to a notification from a worktree,
1466 fn update_repositories_from_worktree(
1467 &mut self,
1468 worktree_id: WorktreeId,
1469 project_environment: Entity<ProjectEnvironment>,
1470 next_repository_id: Arc<AtomicU64>,
1471 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
1472 updated_git_repositories: UpdatedGitRepositoriesSet,
1473 fs: Arc<dyn Fs>,
1474 cx: &mut Context<Self>,
1475 ) {
1476 let mut removed_ids = Vec::new();
1477 for update in updated_git_repositories.iter() {
1478 if let Some((id, existing)) = self.repositories.iter().find(|(_, repo)| {
1479 let existing_work_directory_abs_path =
1480 repo.read(cx).work_directory_abs_path.clone();
1481 Some(&existing_work_directory_abs_path)
1482 == update.old_work_directory_abs_path.as_ref()
1483 || Some(&existing_work_directory_abs_path)
1484 == update.new_work_directory_abs_path.as_ref()
1485 }) {
1486 let repo_id = *id;
1487 if let Some(new_work_directory_abs_path) =
1488 update.new_work_directory_abs_path.clone()
1489 {
1490 self.worktree_ids
1491 .entry(repo_id)
1492 .or_insert_with(HashSet::new)
1493 .insert(worktree_id);
1494 existing.update(cx, |existing, cx| {
1495 existing.snapshot.work_directory_abs_path = new_work_directory_abs_path;
1496 existing.schedule_scan(updates_tx.clone(), cx);
1497 });
1498 } else {
1499 if let Some(worktree_ids) = self.worktree_ids.get_mut(&repo_id) {
1500 worktree_ids.remove(&worktree_id);
1501 if worktree_ids.is_empty() {
1502 removed_ids.push(repo_id);
1503 }
1504 }
1505 }
1506 } else if let UpdatedGitRepository {
1507 new_work_directory_abs_path: Some(work_directory_abs_path),
1508 dot_git_abs_path: Some(dot_git_abs_path),
1509 repository_dir_abs_path: Some(_repository_dir_abs_path),
1510 common_dir_abs_path: Some(_common_dir_abs_path),
1511 ..
1512 } = update
1513 {
1514 let id = RepositoryId(next_repository_id.fetch_add(1, atomic::Ordering::Release));
1515 let git_store = cx.weak_entity();
1516 let repo = cx.new(|cx| {
1517 let mut repo = Repository::local(
1518 id,
1519 work_directory_abs_path.clone(),
1520 dot_git_abs_path.clone(),
1521 project_environment.downgrade(),
1522 fs.clone(),
1523 git_store,
1524 cx,
1525 );
1526 if let Some(updates_tx) = updates_tx.as_ref() {
1527 // trigger an empty `UpdateRepository` to ensure remote active_repo_id is set correctly
1528 updates_tx
1529 .unbounded_send(DownstreamUpdate::UpdateRepository(repo.snapshot()))
1530 .ok();
1531 }
1532 repo.schedule_scan(updates_tx.clone(), cx);
1533 repo
1534 });
1535 self._subscriptions
1536 .push(cx.subscribe(&repo, Self::on_repository_event));
1537 self._subscriptions
1538 .push(cx.subscribe(&repo, Self::on_jobs_updated));
1539 self.repositories.insert(id, repo);
1540 self.worktree_ids.insert(id, HashSet::from([worktree_id]));
1541 cx.emit(GitStoreEvent::RepositoryAdded);
1542 self.active_repo_id.get_or_insert_with(|| {
1543 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1544 id
1545 });
1546 }
1547 }
1548
1549 for id in removed_ids {
1550 if self.active_repo_id == Some(id) {
1551 self.active_repo_id = None;
1552 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1553 }
1554 self.repositories.remove(&id);
1555 if let Some(updates_tx) = updates_tx.as_ref() {
1556 updates_tx
1557 .unbounded_send(DownstreamUpdate::RemoveRepository(id))
1558 .ok();
1559 }
1560 }
1561 }
1562
1563 fn on_buffer_store_event(
1564 &mut self,
1565 _: Entity<BufferStore>,
1566 event: &BufferStoreEvent,
1567 cx: &mut Context<Self>,
1568 ) {
1569 match event {
1570 BufferStoreEvent::BufferAdded(buffer) => {
1571 cx.subscribe(buffer, |this, buffer, event, cx| {
1572 if let BufferEvent::LanguageChanged(_) = event {
1573 let buffer_id = buffer.read(cx).remote_id();
1574 if let Some(diff_state) = this.diffs.get(&buffer_id) {
1575 diff_state.update(cx, |diff_state, cx| {
1576 diff_state.buffer_language_changed(buffer, cx);
1577 });
1578 }
1579 }
1580 })
1581 .detach();
1582 }
1583 BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => {
1584 if let Some(diffs) = self.shared_diffs.get_mut(peer_id) {
1585 diffs.remove(buffer_id);
1586 }
1587 }
1588 BufferStoreEvent::BufferDropped(buffer_id) => {
1589 self.diffs.remove(buffer_id);
1590 for diffs in self.shared_diffs.values_mut() {
1591 diffs.remove(buffer_id);
1592 }
1593 }
1594 BufferStoreEvent::BufferChangedFilePath { buffer, .. } => {
1595 // Whenever a buffer's file path changes, it's possible that the
1596 // new path is actually a path that is being tracked by a git
1597 // repository. In that case, we'll want to update the buffer's
1598 // `BufferDiffState`, in case it already has one.
1599 let buffer_id = buffer.read(cx).remote_id();
1600 let diff_state = self.diffs.get(&buffer_id);
1601 let repo = self.repository_and_path_for_buffer_id(buffer_id, cx);
1602
1603 if let Some(diff_state) = diff_state
1604 && let Some((repo, repo_path)) = repo
1605 {
1606 let buffer = buffer.clone();
1607 let diff_state = diff_state.clone();
1608
1609 cx.spawn(async move |_git_store, cx| {
1610 async {
1611 let diff_bases_change = repo
1612 .update(cx, |repo, cx| {
1613 repo.load_committed_text(buffer_id, repo_path, cx)
1614 })
1615 .await?;
1616
1617 diff_state.update(cx, |diff_state, cx| {
1618 let buffer_snapshot = buffer.read(cx).text_snapshot();
1619 diff_state.diff_bases_changed(
1620 buffer_snapshot,
1621 Some(diff_bases_change),
1622 cx,
1623 );
1624 });
1625 anyhow::Ok(())
1626 }
1627 .await
1628 .log_err();
1629 })
1630 .detach();
1631 }
1632 }
1633 }
1634 }
1635
1636 pub fn recalculate_buffer_diffs(
1637 &mut self,
1638 buffers: Vec<Entity<Buffer>>,
1639 cx: &mut Context<Self>,
1640 ) -> impl Future<Output = ()> + use<> {
1641 let mut futures = Vec::new();
1642 for buffer in buffers {
1643 if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
1644 let buffer = buffer.read(cx).text_snapshot();
1645 diff_state.update(cx, |diff_state, cx| {
1646 diff_state.recalculate_diffs(buffer.clone(), cx);
1647 futures.extend(diff_state.wait_for_recalculation().map(FutureExt::boxed));
1648 });
1649 futures.push(diff_state.update(cx, |diff_state, cx| {
1650 diff_state
1651 .reparse_conflict_markers(buffer, cx)
1652 .map(|_| {})
1653 .boxed()
1654 }));
1655 }
1656 }
1657 async move {
1658 futures::future::join_all(futures).await;
1659 }
1660 }
1661
1662 fn on_buffer_diff_event(
1663 &mut self,
1664 diff: Entity<buffer_diff::BufferDiff>,
1665 event: &BufferDiffEvent,
1666 cx: &mut Context<Self>,
1667 ) {
1668 if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
1669 let buffer_id = diff.read(cx).buffer_id;
1670 if let Some(diff_state) = self.diffs.get(&buffer_id) {
1671 let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
1672 diff_state.hunk_staging_operation_count += 1;
1673 diff_state.hunk_staging_operation_count
1674 });
1675 if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
1676 let recv = repo.update(cx, |repo, cx| {
1677 log::debug!("hunks changed for {}", path.as_unix_str());
1678 repo.spawn_set_index_text_job(
1679 path,
1680 new_index_text.as_ref().map(|rope| rope.to_string()),
1681 Some(hunk_staging_operation_count),
1682 cx,
1683 )
1684 });
1685 let diff = diff.downgrade();
1686 cx.spawn(async move |this, cx| {
1687 if let Ok(Err(error)) = cx.background_spawn(recv).await {
1688 diff.update(cx, |diff, cx| {
1689 diff.clear_pending_hunks(cx);
1690 })
1691 .ok();
1692 this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
1693 .ok();
1694 }
1695 })
1696 .detach();
1697 }
1698 }
1699 }
1700 }
1701
1702 fn local_worktree_git_repos_changed(
1703 &mut self,
1704 worktree: Entity<Worktree>,
1705 changed_repos: &UpdatedGitRepositoriesSet,
1706 cx: &mut Context<Self>,
1707 ) {
1708 log::debug!("local worktree repos changed");
1709 debug_assert!(worktree.read(cx).is_local());
1710
1711 for repository in self.repositories.values() {
1712 repository.update(cx, |repository, cx| {
1713 let repo_abs_path = &repository.work_directory_abs_path;
1714 if changed_repos.iter().any(|update| {
1715 update.old_work_directory_abs_path.as_ref() == Some(repo_abs_path)
1716 || update.new_work_directory_abs_path.as_ref() == Some(repo_abs_path)
1717 }) {
1718 repository.reload_buffer_diff_bases(cx);
1719 }
1720 });
1721 }
1722 }
1723
1724 pub fn repositories(&self) -> &HashMap<RepositoryId, Entity<Repository>> {
1725 &self.repositories
1726 }
1727
1728 pub fn status_for_buffer_id(&self, buffer_id: BufferId, cx: &App) -> Option<FileStatus> {
1729 let (repo, path) = self.repository_and_path_for_buffer_id(buffer_id, cx)?;
1730 let status = repo.read(cx).snapshot.status_for_path(&path)?;
1731 Some(status.status)
1732 }
1733
1734 pub fn repository_and_path_for_buffer_id(
1735 &self,
1736 buffer_id: BufferId,
1737 cx: &App,
1738 ) -> Option<(Entity<Repository>, RepoPath)> {
1739 let buffer = self.buffer_store.read(cx).get(buffer_id)?;
1740 let project_path = buffer.read(cx).project_path(cx)?;
1741 self.repository_and_path_for_project_path(&project_path, cx)
1742 }
1743
1744 pub fn repository_and_path_for_project_path(
1745 &self,
1746 path: &ProjectPath,
1747 cx: &App,
1748 ) -> Option<(Entity<Repository>, RepoPath)> {
1749 let abs_path = self.worktree_store.read(cx).absolutize(path, cx)?;
1750 self.repositories
1751 .values()
1752 .filter_map(|repo| {
1753 let repo_path = repo.read(cx).abs_path_to_repo_path(&abs_path)?;
1754 Some((repo.clone(), repo_path))
1755 })
1756 .max_by_key(|(repo, _)| repo.read(cx).work_directory_abs_path.clone())
1757 }
1758
1759 pub fn git_init(
1760 &self,
1761 path: Arc<Path>,
1762 fallback_branch_name: String,
1763 cx: &App,
1764 ) -> Task<Result<()>> {
1765 match &self.state {
1766 GitStoreState::Local { fs, .. } => {
1767 let fs = fs.clone();
1768 cx.background_executor()
1769 .spawn(async move { fs.git_init(&path, fallback_branch_name).await })
1770 }
1771 GitStoreState::Remote {
1772 upstream_client,
1773 upstream_project_id: project_id,
1774 ..
1775 } => {
1776 let client = upstream_client.clone();
1777 let project_id = *project_id;
1778 cx.background_executor().spawn(async move {
1779 client
1780 .request(proto::GitInit {
1781 project_id: project_id,
1782 abs_path: path.to_string_lossy().into_owned(),
1783 fallback_branch_name,
1784 })
1785 .await?;
1786 Ok(())
1787 })
1788 }
1789 }
1790 }
1791
1792 pub fn git_clone(
1793 &self,
1794 repo: String,
1795 path: impl Into<Arc<std::path::Path>>,
1796 cx: &App,
1797 ) -> Task<Result<()>> {
1798 let path = path.into();
1799 match &self.state {
1800 GitStoreState::Local { fs, .. } => {
1801 let fs = fs.clone();
1802 cx.background_executor()
1803 .spawn(async move { fs.git_clone(&repo, &path).await })
1804 }
1805 GitStoreState::Remote {
1806 upstream_client,
1807 upstream_project_id,
1808 ..
1809 } => {
1810 if upstream_client.is_via_collab() {
1811 return Task::ready(Err(anyhow!(
1812 "Git Clone isn't supported for project guests"
1813 )));
1814 }
1815 let request = upstream_client.request(proto::GitClone {
1816 project_id: *upstream_project_id,
1817 abs_path: path.to_string_lossy().into_owned(),
1818 remote_repo: repo,
1819 });
1820
1821 cx.background_spawn(async move {
1822 let result = request.await?;
1823
1824 match result.success {
1825 true => Ok(()),
1826 false => Err(anyhow!("Git Clone failed")),
1827 }
1828 })
1829 }
1830 }
1831 }
1832
1833 async fn handle_update_repository(
1834 this: Entity<Self>,
1835 envelope: TypedEnvelope<proto::UpdateRepository>,
1836 mut cx: AsyncApp,
1837 ) -> Result<()> {
1838 this.update(&mut cx, |this, cx| {
1839 let path_style = this.worktree_store.read(cx).path_style();
1840 let mut update = envelope.payload;
1841
1842 let id = RepositoryId::from_proto(update.id);
1843 let client = this.upstream_client().context("no upstream client")?;
1844
1845 let mut repo_subscription = None;
1846 let repo = this.repositories.entry(id).or_insert_with(|| {
1847 let git_store = cx.weak_entity();
1848 let repo = cx.new(|cx| {
1849 Repository::remote(
1850 id,
1851 Path::new(&update.abs_path).into(),
1852 path_style,
1853 ProjectId(update.project_id),
1854 client,
1855 git_store,
1856 cx,
1857 )
1858 });
1859 repo_subscription = Some(cx.subscribe(&repo, Self::on_repository_event));
1860 cx.emit(GitStoreEvent::RepositoryAdded);
1861 repo
1862 });
1863 this._subscriptions.extend(repo_subscription);
1864
1865 repo.update(cx, {
1866 let update = update.clone();
1867 |repo, cx| repo.apply_remote_update(update, cx)
1868 })?;
1869
1870 this.active_repo_id.get_or_insert_with(|| {
1871 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1872 id
1873 });
1874
1875 if let Some((client, project_id)) = this.downstream_client() {
1876 update.project_id = project_id.to_proto();
1877 client.send(update).log_err();
1878 }
1879 Ok(())
1880 })
1881 }
1882
1883 async fn handle_remove_repository(
1884 this: Entity<Self>,
1885 envelope: TypedEnvelope<proto::RemoveRepository>,
1886 mut cx: AsyncApp,
1887 ) -> Result<()> {
1888 this.update(&mut cx, |this, cx| {
1889 let mut update = envelope.payload;
1890 let id = RepositoryId::from_proto(update.id);
1891 this.repositories.remove(&id);
1892 if let Some((client, project_id)) = this.downstream_client() {
1893 update.project_id = project_id.to_proto();
1894 client.send(update).log_err();
1895 }
1896 if this.active_repo_id == Some(id) {
1897 this.active_repo_id = None;
1898 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1899 }
1900 cx.emit(GitStoreEvent::RepositoryRemoved(id));
1901 });
1902 Ok(())
1903 }
1904
1905 async fn handle_git_init(
1906 this: Entity<Self>,
1907 envelope: TypedEnvelope<proto::GitInit>,
1908 cx: AsyncApp,
1909 ) -> Result<proto::Ack> {
1910 let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1911 let name = envelope.payload.fallback_branch_name;
1912 cx.update(|cx| this.read(cx).git_init(path, name, cx))
1913 .await?;
1914
1915 Ok(proto::Ack {})
1916 }
1917
1918 async fn handle_git_clone(
1919 this: Entity<Self>,
1920 envelope: TypedEnvelope<proto::GitClone>,
1921 cx: AsyncApp,
1922 ) -> Result<proto::GitCloneResponse> {
1923 let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1924 let repo_name = envelope.payload.remote_repo;
1925 let result = cx
1926 .update(|cx| this.read(cx).git_clone(repo_name, path, cx))
1927 .await;
1928
1929 Ok(proto::GitCloneResponse {
1930 success: result.is_ok(),
1931 })
1932 }
1933
1934 async fn handle_fetch(
1935 this: Entity<Self>,
1936 envelope: TypedEnvelope<proto::Fetch>,
1937 mut cx: AsyncApp,
1938 ) -> Result<proto::RemoteMessageResponse> {
1939 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1940 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1941 let fetch_options = FetchOptions::from_proto(envelope.payload.remote);
1942 let askpass_id = envelope.payload.askpass_id;
1943
1944 let askpass = make_remote_delegate(
1945 this,
1946 envelope.payload.project_id,
1947 repository_id,
1948 askpass_id,
1949 &mut cx,
1950 );
1951
1952 let remote_output = repository_handle
1953 .update(&mut cx, |repository_handle, cx| {
1954 repository_handle.fetch(fetch_options, askpass, cx)
1955 })
1956 .await??;
1957
1958 Ok(proto::RemoteMessageResponse {
1959 stdout: remote_output.stdout,
1960 stderr: remote_output.stderr,
1961 })
1962 }
1963
1964 async fn handle_push(
1965 this: Entity<Self>,
1966 envelope: TypedEnvelope<proto::Push>,
1967 mut cx: AsyncApp,
1968 ) -> Result<proto::RemoteMessageResponse> {
1969 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1970 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1971
1972 let askpass_id = envelope.payload.askpass_id;
1973 let askpass = make_remote_delegate(
1974 this,
1975 envelope.payload.project_id,
1976 repository_id,
1977 askpass_id,
1978 &mut cx,
1979 );
1980
1981 let options = envelope
1982 .payload
1983 .options
1984 .as_ref()
1985 .map(|_| match envelope.payload.options() {
1986 proto::push::PushOptions::SetUpstream => git::repository::PushOptions::SetUpstream,
1987 proto::push::PushOptions::Force => git::repository::PushOptions::Force,
1988 });
1989
1990 let branch_name = envelope.payload.branch_name.into();
1991 let remote_branch_name = envelope.payload.remote_branch_name.into();
1992 let remote_name = envelope.payload.remote_name.into();
1993
1994 let remote_output = repository_handle
1995 .update(&mut cx, |repository_handle, cx| {
1996 repository_handle.push(
1997 branch_name,
1998 remote_branch_name,
1999 remote_name,
2000 options,
2001 askpass,
2002 cx,
2003 )
2004 })
2005 .await??;
2006 Ok(proto::RemoteMessageResponse {
2007 stdout: remote_output.stdout,
2008 stderr: remote_output.stderr,
2009 })
2010 }
2011
2012 async fn handle_pull(
2013 this: Entity<Self>,
2014 envelope: TypedEnvelope<proto::Pull>,
2015 mut cx: AsyncApp,
2016 ) -> Result<proto::RemoteMessageResponse> {
2017 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2018 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2019 let askpass_id = envelope.payload.askpass_id;
2020 let askpass = make_remote_delegate(
2021 this,
2022 envelope.payload.project_id,
2023 repository_id,
2024 askpass_id,
2025 &mut cx,
2026 );
2027
2028 let branch_name = envelope.payload.branch_name.map(|name| name.into());
2029 let remote_name = envelope.payload.remote_name.into();
2030 let rebase = envelope.payload.rebase;
2031
2032 let remote_message = repository_handle
2033 .update(&mut cx, |repository_handle, cx| {
2034 repository_handle.pull(branch_name, remote_name, rebase, askpass, cx)
2035 })
2036 .await??;
2037
2038 Ok(proto::RemoteMessageResponse {
2039 stdout: remote_message.stdout,
2040 stderr: remote_message.stderr,
2041 })
2042 }
2043
2044 async fn handle_stage(
2045 this: Entity<Self>,
2046 envelope: TypedEnvelope<proto::Stage>,
2047 mut cx: AsyncApp,
2048 ) -> Result<proto::Ack> {
2049 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2050 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2051
2052 let entries = envelope
2053 .payload
2054 .paths
2055 .into_iter()
2056 .map(|path| RepoPath::new(&path))
2057 .collect::<Result<Vec<_>>>()?;
2058
2059 repository_handle
2060 .update(&mut cx, |repository_handle, cx| {
2061 repository_handle.stage_entries(entries, cx)
2062 })
2063 .await?;
2064 Ok(proto::Ack {})
2065 }
2066
2067 async fn handle_unstage(
2068 this: Entity<Self>,
2069 envelope: TypedEnvelope<proto::Unstage>,
2070 mut cx: AsyncApp,
2071 ) -> Result<proto::Ack> {
2072 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2073 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2074
2075 let entries = envelope
2076 .payload
2077 .paths
2078 .into_iter()
2079 .map(|path| RepoPath::new(&path))
2080 .collect::<Result<Vec<_>>>()?;
2081
2082 repository_handle
2083 .update(&mut cx, |repository_handle, cx| {
2084 repository_handle.unstage_entries(entries, cx)
2085 })
2086 .await?;
2087
2088 Ok(proto::Ack {})
2089 }
2090
2091 async fn handle_stash(
2092 this: Entity<Self>,
2093 envelope: TypedEnvelope<proto::Stash>,
2094 mut cx: AsyncApp,
2095 ) -> Result<proto::Ack> {
2096 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2097 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2098
2099 let entries = envelope
2100 .payload
2101 .paths
2102 .into_iter()
2103 .map(|path| RepoPath::new(&path))
2104 .collect::<Result<Vec<_>>>()?;
2105
2106 repository_handle
2107 .update(&mut cx, |repository_handle, cx| {
2108 repository_handle.stash_entries(entries, cx)
2109 })
2110 .await?;
2111
2112 Ok(proto::Ack {})
2113 }
2114
2115 async fn handle_stash_pop(
2116 this: Entity<Self>,
2117 envelope: TypedEnvelope<proto::StashPop>,
2118 mut cx: AsyncApp,
2119 ) -> Result<proto::Ack> {
2120 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2121 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2122 let stash_index = envelope.payload.stash_index.map(|i| i as usize);
2123
2124 repository_handle
2125 .update(&mut cx, |repository_handle, cx| {
2126 repository_handle.stash_pop(stash_index, cx)
2127 })
2128 .await?;
2129
2130 Ok(proto::Ack {})
2131 }
2132
2133 async fn handle_stash_apply(
2134 this: Entity<Self>,
2135 envelope: TypedEnvelope<proto::StashApply>,
2136 mut cx: AsyncApp,
2137 ) -> Result<proto::Ack> {
2138 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2139 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2140 let stash_index = envelope.payload.stash_index.map(|i| i as usize);
2141
2142 repository_handle
2143 .update(&mut cx, |repository_handle, cx| {
2144 repository_handle.stash_apply(stash_index, cx)
2145 })
2146 .await?;
2147
2148 Ok(proto::Ack {})
2149 }
2150
2151 async fn handle_stash_drop(
2152 this: Entity<Self>,
2153 envelope: TypedEnvelope<proto::StashDrop>,
2154 mut cx: AsyncApp,
2155 ) -> Result<proto::Ack> {
2156 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2157 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2158 let stash_index = envelope.payload.stash_index.map(|i| i as usize);
2159
2160 repository_handle
2161 .update(&mut cx, |repository_handle, cx| {
2162 repository_handle.stash_drop(stash_index, cx)
2163 })
2164 .await??;
2165
2166 Ok(proto::Ack {})
2167 }
2168
2169 async fn handle_set_index_text(
2170 this: Entity<Self>,
2171 envelope: TypedEnvelope<proto::SetIndexText>,
2172 mut cx: AsyncApp,
2173 ) -> Result<proto::Ack> {
2174 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2175 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2176 let repo_path = RepoPath::from_proto(&envelope.payload.path)?;
2177
2178 repository_handle
2179 .update(&mut cx, |repository_handle, cx| {
2180 repository_handle.spawn_set_index_text_job(
2181 repo_path,
2182 envelope.payload.text,
2183 None,
2184 cx,
2185 )
2186 })
2187 .await??;
2188 Ok(proto::Ack {})
2189 }
2190
2191 async fn handle_run_hook(
2192 this: Entity<Self>,
2193 envelope: TypedEnvelope<proto::RunGitHook>,
2194 mut cx: AsyncApp,
2195 ) -> Result<proto::Ack> {
2196 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2197 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2198 let hook = RunHook::from_proto(envelope.payload.hook).context("invalid hook")?;
2199 repository_handle
2200 .update(&mut cx, |repository_handle, cx| {
2201 repository_handle.run_hook(hook, cx)
2202 })
2203 .await??;
2204 Ok(proto::Ack {})
2205 }
2206
2207 async fn handle_commit(
2208 this: Entity<Self>,
2209 envelope: TypedEnvelope<proto::Commit>,
2210 mut cx: AsyncApp,
2211 ) -> Result<proto::Ack> {
2212 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2213 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2214 let askpass_id = envelope.payload.askpass_id;
2215
2216 let askpass = make_remote_delegate(
2217 this,
2218 envelope.payload.project_id,
2219 repository_id,
2220 askpass_id,
2221 &mut cx,
2222 );
2223
2224 let message = SharedString::from(envelope.payload.message);
2225 let name = envelope.payload.name.map(SharedString::from);
2226 let email = envelope.payload.email.map(SharedString::from);
2227 let options = envelope.payload.options.unwrap_or_default();
2228
2229 repository_handle
2230 .update(&mut cx, |repository_handle, cx| {
2231 repository_handle.commit(
2232 message,
2233 name.zip(email),
2234 CommitOptions {
2235 amend: options.amend,
2236 signoff: options.signoff,
2237 },
2238 askpass,
2239 cx,
2240 )
2241 })
2242 .await??;
2243 Ok(proto::Ack {})
2244 }
2245
2246 async fn handle_get_remotes(
2247 this: Entity<Self>,
2248 envelope: TypedEnvelope<proto::GetRemotes>,
2249 mut cx: AsyncApp,
2250 ) -> Result<proto::GetRemotesResponse> {
2251 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2252 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2253
2254 let branch_name = envelope.payload.branch_name;
2255 let is_push = envelope.payload.is_push;
2256
2257 let remotes = repository_handle
2258 .update(&mut cx, |repository_handle, _| {
2259 repository_handle.get_remotes(branch_name, is_push)
2260 })
2261 .await??;
2262
2263 Ok(proto::GetRemotesResponse {
2264 remotes: remotes
2265 .into_iter()
2266 .map(|remotes| proto::get_remotes_response::Remote {
2267 name: remotes.name.to_string(),
2268 })
2269 .collect::<Vec<_>>(),
2270 })
2271 }
2272
2273 async fn handle_get_worktrees(
2274 this: Entity<Self>,
2275 envelope: TypedEnvelope<proto::GitGetWorktrees>,
2276 mut cx: AsyncApp,
2277 ) -> Result<proto::GitWorktreesResponse> {
2278 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2279 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2280
2281 let worktrees = repository_handle
2282 .update(&mut cx, |repository_handle, _| {
2283 repository_handle.worktrees()
2284 })
2285 .await??;
2286
2287 Ok(proto::GitWorktreesResponse {
2288 worktrees: worktrees
2289 .into_iter()
2290 .map(|worktree| worktree_to_proto(&worktree))
2291 .collect::<Vec<_>>(),
2292 })
2293 }
2294
2295 async fn handle_create_worktree(
2296 this: Entity<Self>,
2297 envelope: TypedEnvelope<proto::GitCreateWorktree>,
2298 mut cx: AsyncApp,
2299 ) -> Result<proto::Ack> {
2300 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2301 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2302 let directory = PathBuf::from(envelope.payload.directory);
2303 let name = envelope.payload.name;
2304 let commit = envelope.payload.commit;
2305
2306 repository_handle
2307 .update(&mut cx, |repository_handle, _| {
2308 repository_handle.create_worktree(name, directory, commit)
2309 })
2310 .await??;
2311
2312 Ok(proto::Ack {})
2313 }
2314
2315 async fn handle_get_branches(
2316 this: Entity<Self>,
2317 envelope: TypedEnvelope<proto::GitGetBranches>,
2318 mut cx: AsyncApp,
2319 ) -> Result<proto::GitBranchesResponse> {
2320 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2321 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2322
2323 let branches = repository_handle
2324 .update(&mut cx, |repository_handle, _| repository_handle.branches())
2325 .await??;
2326
2327 Ok(proto::GitBranchesResponse {
2328 branches: branches
2329 .into_iter()
2330 .map(|branch| branch_to_proto(&branch))
2331 .collect::<Vec<_>>(),
2332 })
2333 }
2334 async fn handle_get_default_branch(
2335 this: Entity<Self>,
2336 envelope: TypedEnvelope<proto::GetDefaultBranch>,
2337 mut cx: AsyncApp,
2338 ) -> Result<proto::GetDefaultBranchResponse> {
2339 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2340 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2341
2342 let branch = repository_handle
2343 .update(&mut cx, |repository_handle, _| {
2344 repository_handle.default_branch(false)
2345 })
2346 .await??
2347 .map(Into::into);
2348
2349 Ok(proto::GetDefaultBranchResponse { branch })
2350 }
2351 async fn handle_create_branch(
2352 this: Entity<Self>,
2353 envelope: TypedEnvelope<proto::GitCreateBranch>,
2354 mut cx: AsyncApp,
2355 ) -> Result<proto::Ack> {
2356 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2357 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2358 let branch_name = envelope.payload.branch_name;
2359
2360 repository_handle
2361 .update(&mut cx, |repository_handle, _| {
2362 repository_handle.create_branch(branch_name, None)
2363 })
2364 .await??;
2365
2366 Ok(proto::Ack {})
2367 }
2368
2369 async fn handle_change_branch(
2370 this: Entity<Self>,
2371 envelope: TypedEnvelope<proto::GitChangeBranch>,
2372 mut cx: AsyncApp,
2373 ) -> Result<proto::Ack> {
2374 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2375 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2376 let branch_name = envelope.payload.branch_name;
2377
2378 repository_handle
2379 .update(&mut cx, |repository_handle, _| {
2380 repository_handle.change_branch(branch_name)
2381 })
2382 .await??;
2383
2384 Ok(proto::Ack {})
2385 }
2386
2387 async fn handle_rename_branch(
2388 this: Entity<Self>,
2389 envelope: TypedEnvelope<proto::GitRenameBranch>,
2390 mut cx: AsyncApp,
2391 ) -> Result<proto::Ack> {
2392 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2393 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2394 let branch = envelope.payload.branch;
2395 let new_name = envelope.payload.new_name;
2396
2397 repository_handle
2398 .update(&mut cx, |repository_handle, _| {
2399 repository_handle.rename_branch(branch, new_name)
2400 })
2401 .await??;
2402
2403 Ok(proto::Ack {})
2404 }
2405
2406 async fn handle_create_remote(
2407 this: Entity<Self>,
2408 envelope: TypedEnvelope<proto::GitCreateRemote>,
2409 mut cx: AsyncApp,
2410 ) -> Result<proto::Ack> {
2411 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2412 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2413 let remote_name = envelope.payload.remote_name;
2414 let remote_url = envelope.payload.remote_url;
2415
2416 repository_handle
2417 .update(&mut cx, |repository_handle, _| {
2418 repository_handle.create_remote(remote_name, remote_url)
2419 })
2420 .await??;
2421
2422 Ok(proto::Ack {})
2423 }
2424
2425 async fn handle_delete_branch(
2426 this: Entity<Self>,
2427 envelope: TypedEnvelope<proto::GitDeleteBranch>,
2428 mut cx: AsyncApp,
2429 ) -> Result<proto::Ack> {
2430 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2431 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2432 let branch_name = envelope.payload.branch_name;
2433
2434 repository_handle
2435 .update(&mut cx, |repository_handle, _| {
2436 repository_handle.delete_branch(branch_name)
2437 })
2438 .await??;
2439
2440 Ok(proto::Ack {})
2441 }
2442
2443 async fn handle_remove_remote(
2444 this: Entity<Self>,
2445 envelope: TypedEnvelope<proto::GitRemoveRemote>,
2446 mut cx: AsyncApp,
2447 ) -> Result<proto::Ack> {
2448 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2449 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2450 let remote_name = envelope.payload.remote_name;
2451
2452 repository_handle
2453 .update(&mut cx, |repository_handle, _| {
2454 repository_handle.remove_remote(remote_name)
2455 })
2456 .await??;
2457
2458 Ok(proto::Ack {})
2459 }
2460
2461 async fn handle_show(
2462 this: Entity<Self>,
2463 envelope: TypedEnvelope<proto::GitShow>,
2464 mut cx: AsyncApp,
2465 ) -> Result<proto::GitCommitDetails> {
2466 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2467 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2468
2469 let commit = repository_handle
2470 .update(&mut cx, |repository_handle, _| {
2471 repository_handle.show(envelope.payload.commit)
2472 })
2473 .await??;
2474 Ok(proto::GitCommitDetails {
2475 sha: commit.sha.into(),
2476 message: commit.message.into(),
2477 commit_timestamp: commit.commit_timestamp,
2478 author_email: commit.author_email.into(),
2479 author_name: commit.author_name.into(),
2480 })
2481 }
2482
2483 async fn handle_load_commit_diff(
2484 this: Entity<Self>,
2485 envelope: TypedEnvelope<proto::LoadCommitDiff>,
2486 mut cx: AsyncApp,
2487 ) -> Result<proto::LoadCommitDiffResponse> {
2488 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2489 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2490
2491 let commit_diff = repository_handle
2492 .update(&mut cx, |repository_handle, _| {
2493 repository_handle.load_commit_diff(envelope.payload.commit)
2494 })
2495 .await??;
2496 Ok(proto::LoadCommitDiffResponse {
2497 files: commit_diff
2498 .files
2499 .into_iter()
2500 .map(|file| proto::CommitFile {
2501 path: file.path.to_proto(),
2502 old_text: file.old_text,
2503 new_text: file.new_text,
2504 is_binary: file.is_binary,
2505 })
2506 .collect(),
2507 })
2508 }
2509
2510 async fn handle_file_history(
2511 this: Entity<Self>,
2512 envelope: TypedEnvelope<proto::GitFileHistory>,
2513 mut cx: AsyncApp,
2514 ) -> Result<proto::GitFileHistoryResponse> {
2515 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2516 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2517 let path = RepoPath::from_proto(&envelope.payload.path)?;
2518 let skip = envelope.payload.skip as usize;
2519 let limit = envelope.payload.limit.map(|l| l as usize);
2520
2521 let file_history = repository_handle
2522 .update(&mut cx, |repository_handle, _| {
2523 repository_handle.file_history_paginated(path, skip, limit)
2524 })
2525 .await??;
2526
2527 Ok(proto::GitFileHistoryResponse {
2528 entries: file_history
2529 .entries
2530 .into_iter()
2531 .map(|entry| proto::FileHistoryEntry {
2532 sha: entry.sha.to_string(),
2533 subject: entry.subject.to_string(),
2534 message: entry.message.to_string(),
2535 commit_timestamp: entry.commit_timestamp,
2536 author_name: entry.author_name.to_string(),
2537 author_email: entry.author_email.to_string(),
2538 })
2539 .collect(),
2540 path: file_history.path.to_proto(),
2541 })
2542 }
2543
2544 async fn handle_reset(
2545 this: Entity<Self>,
2546 envelope: TypedEnvelope<proto::GitReset>,
2547 mut cx: AsyncApp,
2548 ) -> Result<proto::Ack> {
2549 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2550 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2551
2552 let mode = match envelope.payload.mode() {
2553 git_reset::ResetMode::Soft => ResetMode::Soft,
2554 git_reset::ResetMode::Mixed => ResetMode::Mixed,
2555 };
2556
2557 repository_handle
2558 .update(&mut cx, |repository_handle, cx| {
2559 repository_handle.reset(envelope.payload.commit, mode, cx)
2560 })
2561 .await??;
2562 Ok(proto::Ack {})
2563 }
2564
2565 async fn handle_checkout_files(
2566 this: Entity<Self>,
2567 envelope: TypedEnvelope<proto::GitCheckoutFiles>,
2568 mut cx: AsyncApp,
2569 ) -> Result<proto::Ack> {
2570 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2571 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2572 let paths = envelope
2573 .payload
2574 .paths
2575 .iter()
2576 .map(|s| RepoPath::from_proto(s))
2577 .collect::<Result<Vec<_>>>()?;
2578
2579 repository_handle
2580 .update(&mut cx, |repository_handle, cx| {
2581 repository_handle.checkout_files(&envelope.payload.commit, paths, cx)
2582 })
2583 .await?;
2584 Ok(proto::Ack {})
2585 }
2586
2587 async fn handle_open_commit_message_buffer(
2588 this: Entity<Self>,
2589 envelope: TypedEnvelope<proto::OpenCommitMessageBuffer>,
2590 mut cx: AsyncApp,
2591 ) -> Result<proto::OpenBufferResponse> {
2592 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2593 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
2594 let buffer = repository
2595 .update(&mut cx, |repository, cx| {
2596 repository.open_commit_buffer(None, this.read(cx).buffer_store.clone(), cx)
2597 })
2598 .await?;
2599
2600 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id());
2601 this.update(&mut cx, |this, cx| {
2602 this.buffer_store.update(cx, |buffer_store, cx| {
2603 buffer_store
2604 .create_buffer_for_peer(
2605 &buffer,
2606 envelope.original_sender_id.unwrap_or(envelope.sender_id),
2607 cx,
2608 )
2609 .detach_and_log_err(cx);
2610 })
2611 });
2612
2613 Ok(proto::OpenBufferResponse {
2614 buffer_id: buffer_id.to_proto(),
2615 })
2616 }
2617
2618 async fn handle_askpass(
2619 this: Entity<Self>,
2620 envelope: TypedEnvelope<proto::AskPassRequest>,
2621 mut cx: AsyncApp,
2622 ) -> Result<proto::AskPassResponse> {
2623 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2624 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
2625
2626 let delegates = cx.update(|cx| repository.read(cx).askpass_delegates.clone());
2627 let Some(mut askpass) = delegates.lock().remove(&envelope.payload.askpass_id) else {
2628 debug_panic!("no askpass found");
2629 anyhow::bail!("no askpass found");
2630 };
2631
2632 let response = askpass
2633 .ask_password(envelope.payload.prompt)
2634 .await
2635 .ok_or_else(|| anyhow::anyhow!("askpass cancelled"))?;
2636
2637 delegates
2638 .lock()
2639 .insert(envelope.payload.askpass_id, askpass);
2640
2641 // In fact, we don't quite know what we're doing here, as we're sending askpass password unencrypted, but..
2642 Ok(proto::AskPassResponse {
2643 response: response.decrypt(IKnowWhatIAmDoingAndIHaveReadTheDocs)?,
2644 })
2645 }
2646
2647 async fn handle_check_for_pushed_commits(
2648 this: Entity<Self>,
2649 envelope: TypedEnvelope<proto::CheckForPushedCommits>,
2650 mut cx: AsyncApp,
2651 ) -> Result<proto::CheckForPushedCommitsResponse> {
2652 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2653 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2654
2655 let branches = repository_handle
2656 .update(&mut cx, |repository_handle, _| {
2657 repository_handle.check_for_pushed_commits()
2658 })
2659 .await??;
2660 Ok(proto::CheckForPushedCommitsResponse {
2661 pushed_to: branches
2662 .into_iter()
2663 .map(|commit| commit.to_string())
2664 .collect(),
2665 })
2666 }
2667
2668 async fn handle_git_diff(
2669 this: Entity<Self>,
2670 envelope: TypedEnvelope<proto::GitDiff>,
2671 mut cx: AsyncApp,
2672 ) -> Result<proto::GitDiffResponse> {
2673 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2674 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2675 let diff_type = match envelope.payload.diff_type() {
2676 proto::git_diff::DiffType::HeadToIndex => DiffType::HeadToIndex,
2677 proto::git_diff::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
2678 proto::git_diff::DiffType::MergeBase => {
2679 let base_ref = envelope
2680 .payload
2681 .merge_base_ref
2682 .ok_or_else(|| anyhow!("merge_base_ref is required for MergeBase diff type"))?;
2683 DiffType::MergeBase {
2684 base_ref: base_ref.into(),
2685 }
2686 }
2687 };
2688
2689 let mut diff = repository_handle
2690 .update(&mut cx, |repository_handle, cx| {
2691 repository_handle.diff(diff_type, cx)
2692 })
2693 .await??;
2694 const ONE_MB: usize = 1_000_000;
2695 if diff.len() > ONE_MB {
2696 diff = diff.chars().take(ONE_MB).collect()
2697 }
2698
2699 Ok(proto::GitDiffResponse { diff })
2700 }
2701
2702 async fn handle_git_diff_stat(
2703 this: Entity<Self>,
2704 envelope: TypedEnvelope<proto::GitDiffStat>,
2705 mut cx: AsyncApp,
2706 ) -> Result<proto::GitDiffStatResponse> {
2707 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2708 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2709 let diff_type = match envelope.payload.diff_type() {
2710 proto::git_diff_stat::DiffType::HeadToIndex => DiffType::HeadToIndex,
2711 proto::git_diff_stat::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
2712 proto::git_diff_stat::DiffType::MergeBase => {
2713 let base_ref = envelope
2714 .payload
2715 .merge_base_ref
2716 .ok_or_else(|| anyhow!("merge_base_ref is required for MergeBase diff type"))?;
2717 DiffType::MergeBase {
2718 base_ref: base_ref.into(),
2719 }
2720 }
2721 };
2722
2723 let stats = repository_handle
2724 .update(&mut cx, |repository_handle, cx| {
2725 repository_handle.diff_stat(diff_type, cx)
2726 })
2727 .await??;
2728
2729 let entries = stats
2730 .into_iter()
2731 .map(|(path, stat)| proto::GitDiffStatEntry {
2732 path: path.to_proto(),
2733 added: stat.added,
2734 deleted: stat.deleted,
2735 })
2736 .collect();
2737
2738 Ok(proto::GitDiffStatResponse { entries })
2739 }
2740
2741 async fn handle_tree_diff(
2742 this: Entity<Self>,
2743 request: TypedEnvelope<proto::GetTreeDiff>,
2744 mut cx: AsyncApp,
2745 ) -> Result<proto::GetTreeDiffResponse> {
2746 let repository_id = RepositoryId(request.payload.repository_id);
2747 let diff_type = if request.payload.is_merge {
2748 DiffTreeType::MergeBase {
2749 base: request.payload.base.into(),
2750 head: request.payload.head.into(),
2751 }
2752 } else {
2753 DiffTreeType::Since {
2754 base: request.payload.base.into(),
2755 head: request.payload.head.into(),
2756 }
2757 };
2758
2759 let diff = this
2760 .update(&mut cx, |this, cx| {
2761 let repository = this.repositories().get(&repository_id)?;
2762 Some(repository.update(cx, |repo, cx| repo.diff_tree(diff_type, cx)))
2763 })
2764 .context("missing repository")?
2765 .await??;
2766
2767 Ok(proto::GetTreeDiffResponse {
2768 entries: diff
2769 .entries
2770 .into_iter()
2771 .map(|(path, status)| proto::TreeDiffStatus {
2772 path: path.as_ref().to_proto(),
2773 status: match status {
2774 TreeDiffStatus::Added {} => proto::tree_diff_status::Status::Added.into(),
2775 TreeDiffStatus::Modified { .. } => {
2776 proto::tree_diff_status::Status::Modified.into()
2777 }
2778 TreeDiffStatus::Deleted { .. } => {
2779 proto::tree_diff_status::Status::Deleted.into()
2780 }
2781 },
2782 oid: match status {
2783 TreeDiffStatus::Deleted { old } | TreeDiffStatus::Modified { old } => {
2784 Some(old.to_string())
2785 }
2786 TreeDiffStatus::Added => None,
2787 },
2788 })
2789 .collect(),
2790 })
2791 }
2792
2793 async fn handle_get_blob_content(
2794 this: Entity<Self>,
2795 request: TypedEnvelope<proto::GetBlobContent>,
2796 mut cx: AsyncApp,
2797 ) -> Result<proto::GetBlobContentResponse> {
2798 let oid = git::Oid::from_str(&request.payload.oid)?;
2799 let repository_id = RepositoryId(request.payload.repository_id);
2800 let content = this
2801 .update(&mut cx, |this, cx| {
2802 let repository = this.repositories().get(&repository_id)?;
2803 Some(repository.update(cx, |repo, cx| repo.load_blob_content(oid, cx)))
2804 })
2805 .context("missing repository")?
2806 .await?;
2807 Ok(proto::GetBlobContentResponse { content })
2808 }
2809
2810 async fn handle_open_unstaged_diff(
2811 this: Entity<Self>,
2812 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2813 mut cx: AsyncApp,
2814 ) -> Result<proto::OpenUnstagedDiffResponse> {
2815 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2816 let diff = this
2817 .update(&mut cx, |this, cx| {
2818 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2819 Some(this.open_unstaged_diff(buffer, cx))
2820 })
2821 .context("missing buffer")?
2822 .await?;
2823 this.update(&mut cx, |this, _| {
2824 let shared_diffs = this
2825 .shared_diffs
2826 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2827 .or_default();
2828 shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone());
2829 });
2830 let staged_text = diff.read_with(&cx, |diff, cx| diff.base_text_string(cx));
2831 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2832 }
2833
2834 async fn handle_open_uncommitted_diff(
2835 this: Entity<Self>,
2836 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2837 mut cx: AsyncApp,
2838 ) -> Result<proto::OpenUncommittedDiffResponse> {
2839 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2840 let diff = this
2841 .update(&mut cx, |this, cx| {
2842 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2843 Some(this.open_uncommitted_diff(buffer, cx))
2844 })
2845 .context("missing buffer")?
2846 .await?;
2847 this.update(&mut cx, |this, _| {
2848 let shared_diffs = this
2849 .shared_diffs
2850 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2851 .or_default();
2852 shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone());
2853 });
2854 Ok(diff.read_with(&cx, |diff, cx| {
2855 use proto::open_uncommitted_diff_response::Mode;
2856
2857 let unstaged_diff = diff.secondary_diff();
2858 let index_snapshot = unstaged_diff.and_then(|diff| {
2859 let diff = diff.read(cx);
2860 diff.base_text_exists().then(|| diff.base_text(cx))
2861 });
2862
2863 let mode;
2864 let staged_text;
2865 let committed_text;
2866 if diff.base_text_exists() {
2867 let committed_snapshot = diff.base_text(cx);
2868 committed_text = Some(committed_snapshot.text());
2869 if let Some(index_text) = index_snapshot {
2870 if index_text.remote_id() == committed_snapshot.remote_id() {
2871 mode = Mode::IndexMatchesHead;
2872 staged_text = None;
2873 } else {
2874 mode = Mode::IndexAndHead;
2875 staged_text = Some(index_text.text());
2876 }
2877 } else {
2878 mode = Mode::IndexAndHead;
2879 staged_text = None;
2880 }
2881 } else {
2882 mode = Mode::IndexAndHead;
2883 committed_text = None;
2884 staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2885 }
2886
2887 proto::OpenUncommittedDiffResponse {
2888 committed_text,
2889 staged_text,
2890 mode: mode.into(),
2891 }
2892 }))
2893 }
2894
2895 async fn handle_update_diff_bases(
2896 this: Entity<Self>,
2897 request: TypedEnvelope<proto::UpdateDiffBases>,
2898 mut cx: AsyncApp,
2899 ) -> Result<()> {
2900 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2901 this.update(&mut cx, |this, cx| {
2902 if let Some(diff_state) = this.diffs.get_mut(&buffer_id)
2903 && let Some(buffer) = this.buffer_store.read(cx).get(buffer_id)
2904 {
2905 let buffer = buffer.read(cx).text_snapshot();
2906 diff_state.update(cx, |diff_state, cx| {
2907 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2908 })
2909 }
2910 });
2911 Ok(())
2912 }
2913
2914 async fn handle_blame_buffer(
2915 this: Entity<Self>,
2916 envelope: TypedEnvelope<proto::BlameBuffer>,
2917 mut cx: AsyncApp,
2918 ) -> Result<proto::BlameBufferResponse> {
2919 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2920 let version = deserialize_version(&envelope.payload.version);
2921 let buffer = this.read_with(&cx, |this, cx| {
2922 this.buffer_store.read(cx).get_existing(buffer_id)
2923 })?;
2924 buffer
2925 .update(&mut cx, |buffer, _| {
2926 buffer.wait_for_version(version.clone())
2927 })
2928 .await?;
2929 let blame = this
2930 .update(&mut cx, |this, cx| {
2931 this.blame_buffer(&buffer, Some(version), cx)
2932 })
2933 .await?;
2934 Ok(serialize_blame_buffer_response(blame))
2935 }
2936
2937 async fn handle_get_permalink_to_line(
2938 this: Entity<Self>,
2939 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2940 mut cx: AsyncApp,
2941 ) -> Result<proto::GetPermalinkToLineResponse> {
2942 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2943 // let version = deserialize_version(&envelope.payload.version);
2944 let selection = {
2945 let proto_selection = envelope
2946 .payload
2947 .selection
2948 .context("no selection to get permalink for defined")?;
2949 proto_selection.start as u32..proto_selection.end as u32
2950 };
2951 let buffer = this.read_with(&cx, |this, cx| {
2952 this.buffer_store.read(cx).get_existing(buffer_id)
2953 })?;
2954 let permalink = this
2955 .update(&mut cx, |this, cx| {
2956 this.get_permalink_to_line(&buffer, selection, cx)
2957 })
2958 .await?;
2959 Ok(proto::GetPermalinkToLineResponse {
2960 permalink: permalink.to_string(),
2961 })
2962 }
2963
2964 fn repository_for_request(
2965 this: &Entity<Self>,
2966 id: RepositoryId,
2967 cx: &mut AsyncApp,
2968 ) -> Result<Entity<Repository>> {
2969 this.read_with(cx, |this, _| {
2970 this.repositories
2971 .get(&id)
2972 .context("missing repository handle")
2973 .cloned()
2974 })
2975 }
2976
2977 pub fn repo_snapshots(&self, cx: &App) -> HashMap<RepositoryId, RepositorySnapshot> {
2978 self.repositories
2979 .iter()
2980 .map(|(id, repo)| (*id, repo.read(cx).snapshot.clone()))
2981 .collect()
2982 }
2983
2984 fn process_updated_entries(
2985 &self,
2986 worktree: &Entity<Worktree>,
2987 updated_entries: &[(Arc<RelPath>, ProjectEntryId, PathChange)],
2988 cx: &mut App,
2989 ) -> Task<HashMap<Entity<Repository>, Vec<RepoPath>>> {
2990 let path_style = worktree.read(cx).path_style();
2991 let mut repo_paths = self
2992 .repositories
2993 .values()
2994 .map(|repo| (repo.read(cx).work_directory_abs_path.clone(), repo.clone()))
2995 .collect::<Vec<_>>();
2996 let mut entries: Vec<_> = updated_entries
2997 .iter()
2998 .map(|(path, _, _)| path.clone())
2999 .collect();
3000 entries.sort();
3001 let worktree = worktree.read(cx);
3002
3003 let entries = entries
3004 .into_iter()
3005 .map(|path| worktree.absolutize(&path))
3006 .collect::<Arc<[_]>>();
3007
3008 let executor = cx.background_executor().clone();
3009 cx.background_executor().spawn(async move {
3010 repo_paths.sort_by(|lhs, rhs| lhs.0.cmp(&rhs.0));
3011 let mut paths_by_git_repo = HashMap::<_, Vec<_>>::default();
3012 let mut tasks = FuturesOrdered::new();
3013 for (repo_path, repo) in repo_paths.into_iter().rev() {
3014 let entries = entries.clone();
3015 let task = executor.spawn(async move {
3016 // Find all repository paths that belong to this repo
3017 let mut ix = entries.partition_point(|path| path < &*repo_path);
3018 if ix == entries.len() {
3019 return None;
3020 };
3021
3022 let mut paths = Vec::new();
3023 // All paths prefixed by a given repo will constitute a continuous range.
3024 while let Some(path) = entries.get(ix)
3025 && let Some(repo_path) = RepositorySnapshot::abs_path_to_repo_path_inner(
3026 &repo_path, path, path_style,
3027 )
3028 {
3029 paths.push((repo_path, ix));
3030 ix += 1;
3031 }
3032 if paths.is_empty() {
3033 None
3034 } else {
3035 Some((repo, paths))
3036 }
3037 });
3038 tasks.push_back(task);
3039 }
3040
3041 // Now, let's filter out the "duplicate" entries that were processed by multiple distinct repos.
3042 let mut path_was_used = vec![false; entries.len()];
3043 let tasks = tasks.collect::<Vec<_>>().await;
3044 // Process tasks from the back: iterating backwards allows us to see more-specific paths first.
3045 // We always want to assign a path to it's innermost repository.
3046 for t in tasks {
3047 let Some((repo, paths)) = t else {
3048 continue;
3049 };
3050 let entry = paths_by_git_repo.entry(repo).or_default();
3051 for (repo_path, ix) in paths {
3052 if path_was_used[ix] {
3053 continue;
3054 }
3055 path_was_used[ix] = true;
3056 entry.push(repo_path);
3057 }
3058 }
3059
3060 paths_by_git_repo
3061 })
3062 }
3063}
3064
3065impl BufferGitState {
3066 fn new(_git_store: WeakEntity<GitStore>) -> Self {
3067 Self {
3068 unstaged_diff: Default::default(),
3069 uncommitted_diff: Default::default(),
3070 oid_diffs: Default::default(),
3071 recalculate_diff_task: Default::default(),
3072 language: Default::default(),
3073 language_registry: Default::default(),
3074 recalculating_tx: postage::watch::channel_with(false).0,
3075 hunk_staging_operation_count: 0,
3076 hunk_staging_operation_count_as_of_write: 0,
3077 head_text: Default::default(),
3078 index_text: Default::default(),
3079 oid_texts: Default::default(),
3080 head_changed: Default::default(),
3081 index_changed: Default::default(),
3082 language_changed: Default::default(),
3083 conflict_updated_futures: Default::default(),
3084 conflict_set: Default::default(),
3085 reparse_conflict_markers_task: Default::default(),
3086 }
3087 }
3088
3089 #[ztracing::instrument(skip_all)]
3090 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
3091 self.language = buffer.read(cx).language().cloned();
3092 self.language_changed = true;
3093 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
3094 }
3095
3096 fn reparse_conflict_markers(
3097 &mut self,
3098 buffer: text::BufferSnapshot,
3099 cx: &mut Context<Self>,
3100 ) -> oneshot::Receiver<()> {
3101 let (tx, rx) = oneshot::channel();
3102
3103 let Some(conflict_set) = self
3104 .conflict_set
3105 .as_ref()
3106 .and_then(|conflict_set| conflict_set.upgrade())
3107 else {
3108 return rx;
3109 };
3110
3111 let old_snapshot = conflict_set.read_with(cx, |conflict_set, _| {
3112 if conflict_set.has_conflict {
3113 Some(conflict_set.snapshot())
3114 } else {
3115 None
3116 }
3117 });
3118
3119 if let Some(old_snapshot) = old_snapshot {
3120 self.conflict_updated_futures.push(tx);
3121 self.reparse_conflict_markers_task = Some(cx.spawn(async move |this, cx| {
3122 let (snapshot, changed_range) = cx
3123 .background_spawn(async move {
3124 let new_snapshot = ConflictSet::parse(&buffer);
3125 let changed_range = old_snapshot.compare(&new_snapshot, &buffer);
3126 (new_snapshot, changed_range)
3127 })
3128 .await;
3129 this.update(cx, |this, cx| {
3130 if let Some(conflict_set) = &this.conflict_set {
3131 conflict_set
3132 .update(cx, |conflict_set, cx| {
3133 conflict_set.set_snapshot(snapshot, changed_range, cx);
3134 })
3135 .ok();
3136 }
3137 let futures = std::mem::take(&mut this.conflict_updated_futures);
3138 for tx in futures {
3139 tx.send(()).ok();
3140 }
3141 })
3142 }))
3143 }
3144
3145 rx
3146 }
3147
3148 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
3149 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
3150 }
3151
3152 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
3153 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
3154 }
3155
3156 fn oid_diff(&self, oid: Option<git::Oid>) -> Option<Entity<BufferDiff>> {
3157 self.oid_diffs.get(&oid).and_then(|weak| weak.upgrade())
3158 }
3159
3160 fn handle_base_texts_updated(
3161 &mut self,
3162 buffer: text::BufferSnapshot,
3163 message: proto::UpdateDiffBases,
3164 cx: &mut Context<Self>,
3165 ) {
3166 use proto::update_diff_bases::Mode;
3167
3168 let Some(mode) = Mode::from_i32(message.mode) else {
3169 return;
3170 };
3171
3172 let diff_bases_change = match mode {
3173 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
3174 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
3175 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
3176 Mode::IndexAndHead => DiffBasesChange::SetEach {
3177 index: message.staged_text,
3178 head: message.committed_text,
3179 },
3180 };
3181
3182 self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
3183 }
3184
3185 pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
3186 if *self.recalculating_tx.borrow() {
3187 let mut rx = self.recalculating_tx.subscribe();
3188 Some(async move {
3189 loop {
3190 let is_recalculating = rx.recv().await;
3191 if is_recalculating != Some(true) {
3192 break;
3193 }
3194 }
3195 })
3196 } else {
3197 None
3198 }
3199 }
3200
3201 fn diff_bases_changed(
3202 &mut self,
3203 buffer: text::BufferSnapshot,
3204 diff_bases_change: Option<DiffBasesChange>,
3205 cx: &mut Context<Self>,
3206 ) {
3207 match diff_bases_change {
3208 Some(DiffBasesChange::SetIndex(index)) => {
3209 self.index_text = index.map(|mut index| {
3210 text::LineEnding::normalize(&mut index);
3211 Arc::from(index.as_str())
3212 });
3213 self.index_changed = true;
3214 }
3215 Some(DiffBasesChange::SetHead(head)) => {
3216 self.head_text = head.map(|mut head| {
3217 text::LineEnding::normalize(&mut head);
3218 Arc::from(head.as_str())
3219 });
3220 self.head_changed = true;
3221 }
3222 Some(DiffBasesChange::SetBoth(text)) => {
3223 let text = text.map(|mut text| {
3224 text::LineEnding::normalize(&mut text);
3225 Arc::from(text.as_str())
3226 });
3227 self.head_text = text.clone();
3228 self.index_text = text;
3229 self.head_changed = true;
3230 self.index_changed = true;
3231 }
3232 Some(DiffBasesChange::SetEach { index, head }) => {
3233 self.index_text = index.map(|mut index| {
3234 text::LineEnding::normalize(&mut index);
3235 Arc::from(index.as_str())
3236 });
3237 self.index_changed = true;
3238 self.head_text = head.map(|mut head| {
3239 text::LineEnding::normalize(&mut head);
3240 Arc::from(head.as_str())
3241 });
3242 self.head_changed = true;
3243 }
3244 None => {}
3245 }
3246
3247 self.recalculate_diffs(buffer, cx)
3248 }
3249
3250 #[ztracing::instrument(skip_all)]
3251 fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
3252 *self.recalculating_tx.borrow_mut() = true;
3253
3254 let language = self.language.clone();
3255 let language_registry = self.language_registry.clone();
3256 let unstaged_diff = self.unstaged_diff();
3257 let uncommitted_diff = self.uncommitted_diff();
3258 let head = self.head_text.clone();
3259 let index = self.index_text.clone();
3260 let index_changed = self.index_changed;
3261 let head_changed = self.head_changed;
3262 let language_changed = self.language_changed;
3263 let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_write;
3264 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
3265 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
3266 (None, None) => true,
3267 _ => false,
3268 };
3269
3270 let oid_diffs: Vec<(Option<git::Oid>, Entity<BufferDiff>, Option<Arc<str>>)> = self
3271 .oid_diffs
3272 .iter()
3273 .filter_map(|(oid, weak)| {
3274 let base_text = oid.and_then(|oid| self.oid_texts.get(&oid).cloned());
3275 weak.upgrade().map(|diff| (*oid, diff, base_text))
3276 })
3277 .collect();
3278
3279 self.oid_diffs.retain(|oid, weak| {
3280 let alive = weak.upgrade().is_some();
3281 if !alive {
3282 if let Some(oid) = oid {
3283 self.oid_texts.remove(oid);
3284 }
3285 }
3286 alive
3287 });
3288 self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
3289 log::debug!(
3290 "start recalculating diffs for buffer {}",
3291 buffer.remote_id()
3292 );
3293
3294 let mut new_unstaged_diff = None;
3295 if let Some(unstaged_diff) = &unstaged_diff {
3296 new_unstaged_diff = Some(
3297 cx.update(|cx| {
3298 unstaged_diff.read(cx).update_diff(
3299 buffer.clone(),
3300 index,
3301 index_changed.then_some(false),
3302 language.clone(),
3303 cx,
3304 )
3305 })
3306 .await,
3307 );
3308 }
3309
3310 // Dropping BufferDiff can be expensive, so yield back to the event loop
3311 // for a bit
3312 yield_now().await;
3313
3314 let mut new_uncommitted_diff = None;
3315 if let Some(uncommitted_diff) = &uncommitted_diff {
3316 new_uncommitted_diff = if index_matches_head {
3317 new_unstaged_diff.clone()
3318 } else {
3319 Some(
3320 cx.update(|cx| {
3321 uncommitted_diff.read(cx).update_diff(
3322 buffer.clone(),
3323 head,
3324 head_changed.then_some(true),
3325 language.clone(),
3326 cx,
3327 )
3328 })
3329 .await,
3330 )
3331 }
3332 }
3333
3334 // Dropping BufferDiff can be expensive, so yield back to the event loop
3335 // for a bit
3336 yield_now().await;
3337
3338 let cancel = this.update(cx, |this, _| {
3339 // This checks whether all pending stage/unstage operations
3340 // have quiesced (i.e. both the corresponding write and the
3341 // read of that write have completed). If not, then we cancel
3342 // this recalculation attempt to avoid invalidating pending
3343 // state too quickly; another recalculation will come along
3344 // later and clear the pending state once the state of the index has settled.
3345 if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
3346 *this.recalculating_tx.borrow_mut() = false;
3347 true
3348 } else {
3349 false
3350 }
3351 })?;
3352 if cancel {
3353 log::debug!(
3354 concat!(
3355 "aborting recalculating diffs for buffer {}",
3356 "due to subsequent hunk operations",
3357 ),
3358 buffer.remote_id()
3359 );
3360 return Ok(());
3361 }
3362
3363 let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
3364 unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
3365 {
3366 let task = unstaged_diff.update(cx, |diff, cx| {
3367 // For git index buffer we skip assigning the language as we do not really need to perform any syntax highlighting on
3368 // it. As a result, by skipping it we are potentially shaving off a lot of RSS plus we get a snappier feel for large diff
3369 // view multibuffers.
3370 diff.set_snapshot(new_unstaged_diff, &buffer, cx)
3371 });
3372 Some(task.await)
3373 } else {
3374 None
3375 };
3376
3377 yield_now().await;
3378
3379 if let Some((uncommitted_diff, new_uncommitted_diff)) =
3380 uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
3381 {
3382 uncommitted_diff
3383 .update(cx, |diff, cx| {
3384 if language_changed {
3385 diff.language_changed(language.clone(), language_registry, cx);
3386 }
3387 diff.set_snapshot_with_secondary(
3388 new_uncommitted_diff,
3389 &buffer,
3390 unstaged_changed_range.flatten(),
3391 true,
3392 cx,
3393 )
3394 })
3395 .await;
3396 }
3397
3398 yield_now().await;
3399
3400 for (oid, oid_diff, base_text) in oid_diffs {
3401 let new_oid_diff = cx
3402 .update(|cx| {
3403 oid_diff.read(cx).update_diff(
3404 buffer.clone(),
3405 base_text,
3406 None,
3407 language.clone(),
3408 cx,
3409 )
3410 })
3411 .await;
3412
3413 oid_diff
3414 .update(cx, |diff, cx| diff.set_snapshot(new_oid_diff, &buffer, cx))
3415 .await;
3416
3417 log::debug!(
3418 "finished recalculating oid diff for buffer {} oid {:?}",
3419 buffer.remote_id(),
3420 oid
3421 );
3422
3423 yield_now().await;
3424 }
3425
3426 log::debug!(
3427 "finished recalculating diffs for buffer {}",
3428 buffer.remote_id()
3429 );
3430
3431 if let Some(this) = this.upgrade() {
3432 this.update(cx, |this, _| {
3433 this.index_changed = false;
3434 this.head_changed = false;
3435 this.language_changed = false;
3436 *this.recalculating_tx.borrow_mut() = false;
3437 });
3438 }
3439
3440 Ok(())
3441 }));
3442 }
3443}
3444
3445fn make_remote_delegate(
3446 this: Entity<GitStore>,
3447 project_id: u64,
3448 repository_id: RepositoryId,
3449 askpass_id: u64,
3450 cx: &mut AsyncApp,
3451) -> AskPassDelegate {
3452 AskPassDelegate::new(cx, move |prompt, tx, cx| {
3453 this.update(cx, |this, cx| {
3454 let Some((client, _)) = this.downstream_client() else {
3455 return;
3456 };
3457 let response = client.request(proto::AskPassRequest {
3458 project_id,
3459 repository_id: repository_id.to_proto(),
3460 askpass_id,
3461 prompt,
3462 });
3463 cx.spawn(async move |_, _| {
3464 let mut response = response.await?.response;
3465 tx.send(EncryptedPassword::try_from(response.as_ref())?)
3466 .ok();
3467 response.zeroize();
3468 anyhow::Ok(())
3469 })
3470 .detach_and_log_err(cx);
3471 });
3472 })
3473}
3474
3475impl RepositoryId {
3476 pub fn to_proto(self) -> u64 {
3477 self.0
3478 }
3479
3480 pub fn from_proto(id: u64) -> Self {
3481 RepositoryId(id)
3482 }
3483}
3484
3485impl RepositorySnapshot {
3486 fn empty(id: RepositoryId, work_directory_abs_path: Arc<Path>, path_style: PathStyle) -> Self {
3487 Self {
3488 id,
3489 statuses_by_path: Default::default(),
3490 work_directory_abs_path,
3491 branch: None,
3492 head_commit: None,
3493 scan_id: 0,
3494 merge: Default::default(),
3495 remote_origin_url: None,
3496 remote_upstream_url: None,
3497 stash_entries: Default::default(),
3498 path_style,
3499 }
3500 }
3501
3502 fn initial_update(&self, project_id: u64) -> proto::UpdateRepository {
3503 proto::UpdateRepository {
3504 branch_summary: self.branch.as_ref().map(branch_to_proto),
3505 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
3506 updated_statuses: self
3507 .statuses_by_path
3508 .iter()
3509 .map(|entry| entry.to_proto())
3510 .collect(),
3511 removed_statuses: Default::default(),
3512 current_merge_conflicts: self
3513 .merge
3514 .conflicted_paths
3515 .iter()
3516 .map(|repo_path| repo_path.to_proto())
3517 .collect(),
3518 merge_message: self.merge.message.as_ref().map(|msg| msg.to_string()),
3519 project_id,
3520 id: self.id.to_proto(),
3521 abs_path: self.work_directory_abs_path.to_string_lossy().into_owned(),
3522 entry_ids: vec![self.id.to_proto()],
3523 scan_id: self.scan_id,
3524 is_last_update: true,
3525 stash_entries: self
3526 .stash_entries
3527 .entries
3528 .iter()
3529 .map(stash_to_proto)
3530 .collect(),
3531 remote_upstream_url: self.remote_upstream_url.clone(),
3532 remote_origin_url: self.remote_origin_url.clone(),
3533 }
3534 }
3535
3536 fn build_update(&self, old: &Self, project_id: u64) -> proto::UpdateRepository {
3537 let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
3538 let mut removed_statuses: Vec<String> = Vec::new();
3539
3540 let mut new_statuses = self.statuses_by_path.iter().peekable();
3541 let mut old_statuses = old.statuses_by_path.iter().peekable();
3542
3543 let mut current_new_entry = new_statuses.next();
3544 let mut current_old_entry = old_statuses.next();
3545 loop {
3546 match (current_new_entry, current_old_entry) {
3547 (Some(new_entry), Some(old_entry)) => {
3548 match new_entry.repo_path.cmp(&old_entry.repo_path) {
3549 Ordering::Less => {
3550 updated_statuses.push(new_entry.to_proto());
3551 current_new_entry = new_statuses.next();
3552 }
3553 Ordering::Equal => {
3554 if new_entry.status != old_entry.status {
3555 updated_statuses.push(new_entry.to_proto());
3556 }
3557 current_old_entry = old_statuses.next();
3558 current_new_entry = new_statuses.next();
3559 }
3560 Ordering::Greater => {
3561 removed_statuses.push(old_entry.repo_path.to_proto());
3562 current_old_entry = old_statuses.next();
3563 }
3564 }
3565 }
3566 (None, Some(old_entry)) => {
3567 removed_statuses.push(old_entry.repo_path.to_proto());
3568 current_old_entry = old_statuses.next();
3569 }
3570 (Some(new_entry), None) => {
3571 updated_statuses.push(new_entry.to_proto());
3572 current_new_entry = new_statuses.next();
3573 }
3574 (None, None) => break,
3575 }
3576 }
3577
3578 proto::UpdateRepository {
3579 branch_summary: self.branch.as_ref().map(branch_to_proto),
3580 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
3581 updated_statuses,
3582 removed_statuses,
3583 current_merge_conflicts: self
3584 .merge
3585 .conflicted_paths
3586 .iter()
3587 .map(|path| path.to_proto())
3588 .collect(),
3589 merge_message: self.merge.message.as_ref().map(|msg| msg.to_string()),
3590 project_id,
3591 id: self.id.to_proto(),
3592 abs_path: self.work_directory_abs_path.to_string_lossy().into_owned(),
3593 entry_ids: vec![],
3594 scan_id: self.scan_id,
3595 is_last_update: true,
3596 stash_entries: self
3597 .stash_entries
3598 .entries
3599 .iter()
3600 .map(stash_to_proto)
3601 .collect(),
3602 remote_upstream_url: self.remote_upstream_url.clone(),
3603 remote_origin_url: self.remote_origin_url.clone(),
3604 }
3605 }
3606
3607 pub fn status(&self) -> impl Iterator<Item = StatusEntry> + '_ {
3608 self.statuses_by_path.iter().cloned()
3609 }
3610
3611 pub fn status_summary(&self) -> GitSummary {
3612 self.statuses_by_path.summary().item_summary
3613 }
3614
3615 pub fn status_for_path(&self, path: &RepoPath) -> Option<StatusEntry> {
3616 self.statuses_by_path
3617 .get(&PathKey(path.as_ref().clone()), ())
3618 .cloned()
3619 }
3620
3621 pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
3622 Self::abs_path_to_repo_path_inner(&self.work_directory_abs_path, abs_path, self.path_style)
3623 }
3624
3625 fn repo_path_to_abs_path(&self, repo_path: &RepoPath) -> PathBuf {
3626 self.path_style
3627 .join(&self.work_directory_abs_path, repo_path.as_std_path())
3628 .unwrap()
3629 .into()
3630 }
3631
3632 #[inline]
3633 fn abs_path_to_repo_path_inner(
3634 work_directory_abs_path: &Path,
3635 abs_path: &Path,
3636 path_style: PathStyle,
3637 ) -> Option<RepoPath> {
3638 let rel_path = path_style.strip_prefix(abs_path, work_directory_abs_path)?;
3639 Some(RepoPath::from_rel_path(&rel_path))
3640 }
3641
3642 pub fn had_conflict_on_last_merge_head_change(&self, repo_path: &RepoPath) -> bool {
3643 self.merge.conflicted_paths.contains(repo_path)
3644 }
3645
3646 pub fn has_conflict(&self, repo_path: &RepoPath) -> bool {
3647 let had_conflict_on_last_merge_head_change =
3648 self.merge.conflicted_paths.contains(repo_path);
3649 let has_conflict_currently = self
3650 .status_for_path(repo_path)
3651 .is_some_and(|entry| entry.status.is_conflicted());
3652 had_conflict_on_last_merge_head_change || has_conflict_currently
3653 }
3654
3655 /// This is the name that will be displayed in the repository selector for this repository.
3656 pub fn display_name(&self) -> SharedString {
3657 self.work_directory_abs_path
3658 .file_name()
3659 .unwrap_or_default()
3660 .to_string_lossy()
3661 .to_string()
3662 .into()
3663 }
3664}
3665
3666pub fn stash_to_proto(entry: &StashEntry) -> proto::StashEntry {
3667 proto::StashEntry {
3668 oid: entry.oid.as_bytes().to_vec(),
3669 message: entry.message.clone(),
3670 branch: entry.branch.clone(),
3671 index: entry.index as u64,
3672 timestamp: entry.timestamp,
3673 }
3674}
3675
3676pub fn proto_to_stash(entry: &proto::StashEntry) -> Result<StashEntry> {
3677 Ok(StashEntry {
3678 oid: Oid::from_bytes(&entry.oid)?,
3679 message: entry.message.clone(),
3680 index: entry.index as usize,
3681 branch: entry.branch.clone(),
3682 timestamp: entry.timestamp,
3683 })
3684}
3685
3686impl MergeDetails {
3687 async fn load(
3688 backend: &Arc<dyn GitRepository>,
3689 status: &SumTree<StatusEntry>,
3690 prev_snapshot: &RepositorySnapshot,
3691 ) -> Result<(MergeDetails, bool)> {
3692 log::debug!("load merge details");
3693 let message = backend.merge_message().await;
3694 let heads = backend
3695 .revparse_batch(vec![
3696 "MERGE_HEAD".into(),
3697 "CHERRY_PICK_HEAD".into(),
3698 "REBASE_HEAD".into(),
3699 "REVERT_HEAD".into(),
3700 "APPLY_HEAD".into(),
3701 ])
3702 .await
3703 .log_err()
3704 .unwrap_or_default()
3705 .into_iter()
3706 .map(|opt| opt.map(SharedString::from))
3707 .collect::<Vec<_>>();
3708 let merge_heads_changed = heads != prev_snapshot.merge.heads;
3709 let conflicted_paths = if merge_heads_changed {
3710 let current_conflicted_paths = TreeSet::from_ordered_entries(
3711 status
3712 .iter()
3713 .filter(|entry| entry.status.is_conflicted())
3714 .map(|entry| entry.repo_path.clone()),
3715 );
3716
3717 // It can happen that we run a scan while a lengthy merge is in progress
3718 // that will eventually result in conflicts, but before those conflicts
3719 // are reported by `git status`. Since for the moment we only care about
3720 // the merge heads state for the purposes of tracking conflicts, don't update
3721 // this state until we see some conflicts.
3722 if heads.iter().any(Option::is_some)
3723 && !prev_snapshot.merge.heads.iter().any(Option::is_some)
3724 && current_conflicted_paths.is_empty()
3725 {
3726 log::debug!("not updating merge heads because no conflicts found");
3727 return Ok((
3728 MergeDetails {
3729 message: message.map(SharedString::from),
3730 ..prev_snapshot.merge.clone()
3731 },
3732 false,
3733 ));
3734 }
3735
3736 current_conflicted_paths
3737 } else {
3738 prev_snapshot.merge.conflicted_paths.clone()
3739 };
3740 let details = MergeDetails {
3741 conflicted_paths,
3742 message: message.map(SharedString::from),
3743 heads,
3744 };
3745 Ok((details, merge_heads_changed))
3746 }
3747}
3748
3749impl Repository {
3750 pub fn snapshot(&self) -> RepositorySnapshot {
3751 self.snapshot.clone()
3752 }
3753
3754 pub fn pending_ops(&self) -> impl Iterator<Item = PendingOps> + '_ {
3755 self.pending_ops.iter().cloned()
3756 }
3757
3758 pub fn pending_ops_summary(&self) -> PathSummary<PendingOpsSummary> {
3759 self.pending_ops.summary().clone()
3760 }
3761
3762 pub fn pending_ops_for_path(&self, path: &RepoPath) -> Option<PendingOps> {
3763 self.pending_ops
3764 .get(&PathKey(path.as_ref().clone()), ())
3765 .cloned()
3766 }
3767
3768 fn local(
3769 id: RepositoryId,
3770 work_directory_abs_path: Arc<Path>,
3771 dot_git_abs_path: Arc<Path>,
3772 project_environment: WeakEntity<ProjectEnvironment>,
3773 fs: Arc<dyn Fs>,
3774 git_store: WeakEntity<GitStore>,
3775 cx: &mut Context<Self>,
3776 ) -> Self {
3777 let snapshot =
3778 RepositorySnapshot::empty(id, work_directory_abs_path.clone(), PathStyle::local());
3779 let state = cx
3780 .spawn(async move |_, cx| {
3781 LocalRepositoryState::new(
3782 work_directory_abs_path,
3783 dot_git_abs_path,
3784 project_environment,
3785 fs,
3786 cx,
3787 )
3788 .await
3789 .map_err(|err| err.to_string())
3790 })
3791 .shared();
3792 let job_sender = Repository::spawn_local_git_worker(state.clone(), cx);
3793 let state = cx
3794 .spawn(async move |_, _| {
3795 let state = state.await?;
3796 Ok(RepositoryState::Local(state))
3797 })
3798 .shared();
3799
3800 cx.subscribe_self(move |this, event: &RepositoryEvent, _| match event {
3801 RepositoryEvent::BranchChanged | RepositoryEvent::MergeHeadsChanged => {
3802 if this.scan_id > 1 {
3803 this.initial_graph_data.clear();
3804 }
3805 }
3806 _ => {}
3807 })
3808 .detach();
3809
3810 Repository {
3811 this: cx.weak_entity(),
3812 git_store,
3813 snapshot,
3814 pending_ops: Default::default(),
3815 repository_state: state,
3816 commit_message_buffer: None,
3817 askpass_delegates: Default::default(),
3818 paths_needing_status_update: Default::default(),
3819 latest_askpass_id: 0,
3820 job_sender,
3821 job_id: 0,
3822 active_jobs: Default::default(),
3823 initial_graph_data: Default::default(),
3824 commit_data: Default::default(),
3825 graph_commit_data_handler: GraphCommitHandlerState::Closed,
3826 }
3827 }
3828
3829 fn remote(
3830 id: RepositoryId,
3831 work_directory_abs_path: Arc<Path>,
3832 path_style: PathStyle,
3833 project_id: ProjectId,
3834 client: AnyProtoClient,
3835 git_store: WeakEntity<GitStore>,
3836 cx: &mut Context<Self>,
3837 ) -> Self {
3838 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path, path_style);
3839 let repository_state = RemoteRepositoryState { project_id, client };
3840 let job_sender = Self::spawn_remote_git_worker(repository_state.clone(), cx);
3841 let repository_state = Task::ready(Ok(RepositoryState::Remote(repository_state))).shared();
3842 Self {
3843 this: cx.weak_entity(),
3844 snapshot,
3845 commit_message_buffer: None,
3846 git_store,
3847 pending_ops: Default::default(),
3848 paths_needing_status_update: Default::default(),
3849 job_sender,
3850 repository_state,
3851 askpass_delegates: Default::default(),
3852 latest_askpass_id: 0,
3853 active_jobs: Default::default(),
3854 job_id: 0,
3855 initial_graph_data: Default::default(),
3856 commit_data: Default::default(),
3857 graph_commit_data_handler: GraphCommitHandlerState::Closed,
3858 }
3859 }
3860
3861 pub fn git_store(&self) -> Option<Entity<GitStore>> {
3862 self.git_store.upgrade()
3863 }
3864
3865 fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
3866 let this = cx.weak_entity();
3867 let git_store = self.git_store.clone();
3868 let _ = self.send_keyed_job(
3869 Some(GitJobKey::ReloadBufferDiffBases),
3870 None,
3871 |state, mut cx| async move {
3872 let RepositoryState::Local(LocalRepositoryState { backend, .. }) = state else {
3873 log::error!("tried to recompute diffs for a non-local repository");
3874 return Ok(());
3875 };
3876
3877 let Some(this) = this.upgrade() else {
3878 return Ok(());
3879 };
3880
3881 let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
3882 git_store.update(cx, |git_store, cx| {
3883 git_store
3884 .diffs
3885 .iter()
3886 .filter_map(|(buffer_id, diff_state)| {
3887 let buffer_store = git_store.buffer_store.read(cx);
3888 let buffer = buffer_store.get(*buffer_id)?;
3889 let file = File::from_dyn(buffer.read(cx).file())?;
3890 let abs_path = file.worktree.read(cx).absolutize(&file.path);
3891 let repo_path = this.abs_path_to_repo_path(&abs_path)?;
3892 log::debug!(
3893 "start reload diff bases for repo path {}",
3894 repo_path.as_unix_str()
3895 );
3896 diff_state.update(cx, |diff_state, _| {
3897 let has_unstaged_diff = diff_state
3898 .unstaged_diff
3899 .as_ref()
3900 .is_some_and(|diff| diff.is_upgradable());
3901 let has_uncommitted_diff = diff_state
3902 .uncommitted_diff
3903 .as_ref()
3904 .is_some_and(|set| set.is_upgradable());
3905
3906 Some((
3907 buffer,
3908 repo_path,
3909 has_unstaged_diff.then(|| diff_state.index_text.clone()),
3910 has_uncommitted_diff.then(|| diff_state.head_text.clone()),
3911 ))
3912 })
3913 })
3914 .collect::<Vec<_>>()
3915 })
3916 })?;
3917
3918 let buffer_diff_base_changes = cx
3919 .background_spawn(async move {
3920 let mut changes = Vec::new();
3921 for (buffer, repo_path, current_index_text, current_head_text) in
3922 &repo_diff_state_updates
3923 {
3924 let index_text = if current_index_text.is_some() {
3925 backend.load_index_text(repo_path.clone()).await
3926 } else {
3927 None
3928 };
3929 let head_text = if current_head_text.is_some() {
3930 backend.load_committed_text(repo_path.clone()).await
3931 } else {
3932 None
3933 };
3934
3935 let change =
3936 match (current_index_text.as_ref(), current_head_text.as_ref()) {
3937 (Some(current_index), Some(current_head)) => {
3938 let index_changed =
3939 index_text.as_deref() != current_index.as_deref();
3940 let head_changed =
3941 head_text.as_deref() != current_head.as_deref();
3942 if index_changed && head_changed {
3943 if index_text == head_text {
3944 Some(DiffBasesChange::SetBoth(head_text))
3945 } else {
3946 Some(DiffBasesChange::SetEach {
3947 index: index_text,
3948 head: head_text,
3949 })
3950 }
3951 } else if index_changed {
3952 Some(DiffBasesChange::SetIndex(index_text))
3953 } else if head_changed {
3954 Some(DiffBasesChange::SetHead(head_text))
3955 } else {
3956 None
3957 }
3958 }
3959 (Some(current_index), None) => {
3960 let index_changed =
3961 index_text.as_deref() != current_index.as_deref();
3962 index_changed
3963 .then_some(DiffBasesChange::SetIndex(index_text))
3964 }
3965 (None, Some(current_head)) => {
3966 let head_changed =
3967 head_text.as_deref() != current_head.as_deref();
3968 head_changed.then_some(DiffBasesChange::SetHead(head_text))
3969 }
3970 (None, None) => None,
3971 };
3972
3973 changes.push((buffer.clone(), change))
3974 }
3975 changes
3976 })
3977 .await;
3978
3979 git_store.update(&mut cx, |git_store, cx| {
3980 for (buffer, diff_bases_change) in buffer_diff_base_changes {
3981 let buffer_snapshot = buffer.read(cx).text_snapshot();
3982 let buffer_id = buffer_snapshot.remote_id();
3983 let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
3984 continue;
3985 };
3986
3987 let downstream_client = git_store.downstream_client();
3988 diff_state.update(cx, |diff_state, cx| {
3989 use proto::update_diff_bases::Mode;
3990
3991 if let Some((diff_bases_change, (client, project_id))) =
3992 diff_bases_change.clone().zip(downstream_client)
3993 {
3994 let (staged_text, committed_text, mode) = match diff_bases_change {
3995 DiffBasesChange::SetIndex(index) => {
3996 (index, None, Mode::IndexOnly)
3997 }
3998 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
3999 DiffBasesChange::SetEach { index, head } => {
4000 (index, head, Mode::IndexAndHead)
4001 }
4002 DiffBasesChange::SetBoth(text) => {
4003 (None, text, Mode::IndexMatchesHead)
4004 }
4005 };
4006 client
4007 .send(proto::UpdateDiffBases {
4008 project_id: project_id.to_proto(),
4009 buffer_id: buffer_id.to_proto(),
4010 staged_text,
4011 committed_text,
4012 mode: mode as i32,
4013 })
4014 .log_err();
4015 }
4016
4017 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
4018 });
4019 }
4020 })
4021 },
4022 );
4023 }
4024
4025 pub fn send_job<F, Fut, R>(
4026 &mut self,
4027 status: Option<SharedString>,
4028 job: F,
4029 ) -> oneshot::Receiver<R>
4030 where
4031 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
4032 Fut: Future<Output = R> + 'static,
4033 R: Send + 'static,
4034 {
4035 self.send_keyed_job(None, status, job)
4036 }
4037
4038 fn send_keyed_job<F, Fut, R>(
4039 &mut self,
4040 key: Option<GitJobKey>,
4041 status: Option<SharedString>,
4042 job: F,
4043 ) -> oneshot::Receiver<R>
4044 where
4045 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
4046 Fut: Future<Output = R> + 'static,
4047 R: Send + 'static,
4048 {
4049 let (result_tx, result_rx) = futures::channel::oneshot::channel();
4050 let job_id = post_inc(&mut self.job_id);
4051 let this = self.this.clone();
4052 self.job_sender
4053 .unbounded_send(GitJob {
4054 key,
4055 job: Box::new(move |state, cx: &mut AsyncApp| {
4056 let job = job(state, cx.clone());
4057 cx.spawn(async move |cx| {
4058 if let Some(s) = status.clone() {
4059 this.update(cx, |this, cx| {
4060 this.active_jobs.insert(
4061 job_id,
4062 JobInfo {
4063 start: Instant::now(),
4064 message: s.clone(),
4065 },
4066 );
4067
4068 cx.notify();
4069 })
4070 .ok();
4071 }
4072 let result = job.await;
4073
4074 this.update(cx, |this, cx| {
4075 this.active_jobs.remove(&job_id);
4076 cx.notify();
4077 })
4078 .ok();
4079
4080 result_tx.send(result).ok();
4081 })
4082 }),
4083 })
4084 .ok();
4085 result_rx
4086 }
4087
4088 pub fn set_as_active_repository(&self, cx: &mut Context<Self>) {
4089 let Some(git_store) = self.git_store.upgrade() else {
4090 return;
4091 };
4092 let entity = cx.entity();
4093 git_store.update(cx, |git_store, cx| {
4094 let Some((&id, _)) = git_store
4095 .repositories
4096 .iter()
4097 .find(|(_, handle)| *handle == &entity)
4098 else {
4099 return;
4100 };
4101 git_store.active_repo_id = Some(id);
4102 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
4103 });
4104 }
4105
4106 pub fn cached_status(&self) -> impl '_ + Iterator<Item = StatusEntry> {
4107 self.snapshot.status()
4108 }
4109
4110 pub fn cached_stash(&self) -> GitStash {
4111 self.snapshot.stash_entries.clone()
4112 }
4113
4114 pub fn repo_path_to_project_path(&self, path: &RepoPath, cx: &App) -> Option<ProjectPath> {
4115 let git_store = self.git_store.upgrade()?;
4116 let worktree_store = git_store.read(cx).worktree_store.read(cx);
4117 let abs_path = self.snapshot.repo_path_to_abs_path(path);
4118 let abs_path = SanitizedPath::new(&abs_path);
4119 let (worktree, relative_path) = worktree_store.find_worktree(abs_path, cx)?;
4120 Some(ProjectPath {
4121 worktree_id: worktree.read(cx).id(),
4122 path: relative_path,
4123 })
4124 }
4125
4126 pub fn project_path_to_repo_path(&self, path: &ProjectPath, cx: &App) -> Option<RepoPath> {
4127 let git_store = self.git_store.upgrade()?;
4128 let worktree_store = git_store.read(cx).worktree_store.read(cx);
4129 let abs_path = worktree_store.absolutize(path, cx)?;
4130 self.snapshot.abs_path_to_repo_path(&abs_path)
4131 }
4132
4133 pub fn contains_sub_repo(&self, other: &Entity<Self>, cx: &App) -> bool {
4134 other
4135 .read(cx)
4136 .snapshot
4137 .work_directory_abs_path
4138 .starts_with(&self.snapshot.work_directory_abs_path)
4139 }
4140
4141 pub fn open_commit_buffer(
4142 &mut self,
4143 languages: Option<Arc<LanguageRegistry>>,
4144 buffer_store: Entity<BufferStore>,
4145 cx: &mut Context<Self>,
4146 ) -> Task<Result<Entity<Buffer>>> {
4147 let id = self.id;
4148 if let Some(buffer) = self.commit_message_buffer.clone() {
4149 return Task::ready(Ok(buffer));
4150 }
4151 let this = cx.weak_entity();
4152
4153 let rx = self.send_job(None, move |state, mut cx| async move {
4154 let Some(this) = this.upgrade() else {
4155 bail!("git store was dropped");
4156 };
4157 match state {
4158 RepositoryState::Local(..) => {
4159 this.update(&mut cx, |_, cx| {
4160 Self::open_local_commit_buffer(languages, buffer_store, cx)
4161 })
4162 .await
4163 }
4164 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4165 let request = client.request(proto::OpenCommitMessageBuffer {
4166 project_id: project_id.0,
4167 repository_id: id.to_proto(),
4168 });
4169 let response = request.await.context("requesting to open commit buffer")?;
4170 let buffer_id = BufferId::new(response.buffer_id)?;
4171 let buffer = buffer_store
4172 .update(&mut cx, |buffer_store, cx| {
4173 buffer_store.wait_for_remote_buffer(buffer_id, cx)
4174 })
4175 .await?;
4176 if let Some(language_registry) = languages {
4177 let git_commit_language =
4178 language_registry.language_for_name("Git Commit").await?;
4179 buffer.update(&mut cx, |buffer, cx| {
4180 buffer.set_language(Some(git_commit_language), cx);
4181 });
4182 }
4183 this.update(&mut cx, |this, _| {
4184 this.commit_message_buffer = Some(buffer.clone());
4185 });
4186 Ok(buffer)
4187 }
4188 }
4189 });
4190
4191 cx.spawn(|_, _: &mut AsyncApp| async move { rx.await? })
4192 }
4193
4194 fn open_local_commit_buffer(
4195 language_registry: Option<Arc<LanguageRegistry>>,
4196 buffer_store: Entity<BufferStore>,
4197 cx: &mut Context<Self>,
4198 ) -> Task<Result<Entity<Buffer>>> {
4199 cx.spawn(async move |repository, cx| {
4200 let git_commit_language = match language_registry {
4201 Some(language_registry) => {
4202 Some(language_registry.language_for_name("Git Commit").await?)
4203 }
4204 None => None,
4205 };
4206 let buffer = buffer_store
4207 .update(cx, |buffer_store, cx| {
4208 buffer_store.create_buffer(git_commit_language, false, cx)
4209 })
4210 .await?;
4211
4212 repository.update(cx, |repository, _| {
4213 repository.commit_message_buffer = Some(buffer.clone());
4214 })?;
4215 Ok(buffer)
4216 })
4217 }
4218
4219 pub fn checkout_files(
4220 &mut self,
4221 commit: &str,
4222 paths: Vec<RepoPath>,
4223 cx: &mut Context<Self>,
4224 ) -> Task<Result<()>> {
4225 let commit = commit.to_string();
4226 let id = self.id;
4227
4228 self.spawn_job_with_tracking(
4229 paths.clone(),
4230 pending_op::GitStatus::Reverted,
4231 cx,
4232 async move |this, cx| {
4233 this.update(cx, |this, _cx| {
4234 this.send_job(
4235 Some(format!("git checkout {}", commit).into()),
4236 move |git_repo, _| async move {
4237 match git_repo {
4238 RepositoryState::Local(LocalRepositoryState {
4239 backend,
4240 environment,
4241 ..
4242 }) => {
4243 backend
4244 .checkout_files(commit, paths, environment.clone())
4245 .await
4246 }
4247 RepositoryState::Remote(RemoteRepositoryState {
4248 project_id,
4249 client,
4250 }) => {
4251 client
4252 .request(proto::GitCheckoutFiles {
4253 project_id: project_id.0,
4254 repository_id: id.to_proto(),
4255 commit,
4256 paths: paths
4257 .into_iter()
4258 .map(|p| p.to_proto())
4259 .collect(),
4260 })
4261 .await?;
4262
4263 Ok(())
4264 }
4265 }
4266 },
4267 )
4268 })?
4269 .await?
4270 },
4271 )
4272 }
4273
4274 pub fn reset(
4275 &mut self,
4276 commit: String,
4277 reset_mode: ResetMode,
4278 _cx: &mut App,
4279 ) -> oneshot::Receiver<Result<()>> {
4280 let id = self.id;
4281
4282 self.send_job(None, move |git_repo, _| async move {
4283 match git_repo {
4284 RepositoryState::Local(LocalRepositoryState {
4285 backend,
4286 environment,
4287 ..
4288 }) => backend.reset(commit, reset_mode, environment).await,
4289 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4290 client
4291 .request(proto::GitReset {
4292 project_id: project_id.0,
4293 repository_id: id.to_proto(),
4294 commit,
4295 mode: match reset_mode {
4296 ResetMode::Soft => git_reset::ResetMode::Soft.into(),
4297 ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
4298 },
4299 })
4300 .await?;
4301
4302 Ok(())
4303 }
4304 }
4305 })
4306 }
4307
4308 pub fn show(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDetails>> {
4309 let id = self.id;
4310 self.send_job(None, move |git_repo, _cx| async move {
4311 match git_repo {
4312 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
4313 backend.show(commit).await
4314 }
4315 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4316 let resp = client
4317 .request(proto::GitShow {
4318 project_id: project_id.0,
4319 repository_id: id.to_proto(),
4320 commit,
4321 })
4322 .await?;
4323
4324 Ok(CommitDetails {
4325 sha: resp.sha.into(),
4326 message: resp.message.into(),
4327 commit_timestamp: resp.commit_timestamp,
4328 author_email: resp.author_email.into(),
4329 author_name: resp.author_name.into(),
4330 })
4331 }
4332 }
4333 })
4334 }
4335
4336 pub fn load_commit_diff(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
4337 let id = self.id;
4338 self.send_job(None, move |git_repo, cx| async move {
4339 match git_repo {
4340 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
4341 backend.load_commit(commit, cx).await
4342 }
4343 RepositoryState::Remote(RemoteRepositoryState {
4344 client, project_id, ..
4345 }) => {
4346 let response = client
4347 .request(proto::LoadCommitDiff {
4348 project_id: project_id.0,
4349 repository_id: id.to_proto(),
4350 commit,
4351 })
4352 .await?;
4353 Ok(CommitDiff {
4354 files: response
4355 .files
4356 .into_iter()
4357 .map(|file| {
4358 Ok(CommitFile {
4359 path: RepoPath::from_proto(&file.path)?,
4360 old_text: file.old_text,
4361 new_text: file.new_text,
4362 is_binary: file.is_binary,
4363 })
4364 })
4365 .collect::<Result<Vec<_>>>()?,
4366 })
4367 }
4368 }
4369 })
4370 }
4371
4372 pub fn file_history(
4373 &mut self,
4374 path: RepoPath,
4375 ) -> oneshot::Receiver<Result<git::repository::FileHistory>> {
4376 self.file_history_paginated(path, 0, None)
4377 }
4378
4379 pub fn file_history_paginated(
4380 &mut self,
4381 path: RepoPath,
4382 skip: usize,
4383 limit: Option<usize>,
4384 ) -> oneshot::Receiver<Result<git::repository::FileHistory>> {
4385 let id = self.id;
4386 self.send_job(None, move |git_repo, _cx| async move {
4387 match git_repo {
4388 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
4389 backend.file_history_paginated(path, skip, limit).await
4390 }
4391 RepositoryState::Remote(RemoteRepositoryState { client, project_id }) => {
4392 let response = client
4393 .request(proto::GitFileHistory {
4394 project_id: project_id.0,
4395 repository_id: id.to_proto(),
4396 path: path.to_proto(),
4397 skip: skip as u64,
4398 limit: limit.map(|l| l as u64),
4399 })
4400 .await?;
4401 Ok(git::repository::FileHistory {
4402 entries: response
4403 .entries
4404 .into_iter()
4405 .map(|entry| git::repository::FileHistoryEntry {
4406 sha: entry.sha.into(),
4407 subject: entry.subject.into(),
4408 message: entry.message.into(),
4409 commit_timestamp: entry.commit_timestamp,
4410 author_name: entry.author_name.into(),
4411 author_email: entry.author_email.into(),
4412 })
4413 .collect(),
4414 path: RepoPath::from_proto(&response.path)?,
4415 })
4416 }
4417 }
4418 })
4419 }
4420
4421 pub fn get_graph_data(
4422 &self,
4423 log_source: LogSource,
4424 log_order: LogOrder,
4425 ) -> Option<&InitialGitGraphData> {
4426 self.initial_graph_data.get(&(log_source, log_order))
4427 }
4428
4429 pub fn graph_data(
4430 &mut self,
4431 log_source: LogSource,
4432 log_order: LogOrder,
4433 range: Range<usize>,
4434 cx: &mut Context<Self>,
4435 ) -> GraphDataResponse<'_> {
4436 let initial_commit_data = self
4437 .initial_graph_data
4438 .entry((log_source.clone(), log_order))
4439 .or_insert_with(|| {
4440 let state = self.repository_state.clone();
4441 let log_source = log_source.clone();
4442
4443 let fetch_task = cx.spawn(async move |repository, cx| {
4444 let state = state.await;
4445 let result = match state {
4446 Ok(RepositoryState::Local(LocalRepositoryState { backend, .. })) => {
4447 Self::local_git_graph_data(
4448 repository.clone(),
4449 backend,
4450 log_source.clone(),
4451 log_order,
4452 cx,
4453 )
4454 .await
4455 }
4456 Ok(RepositoryState::Remote(_)) => {
4457 Err("Git graph is not supported for collab yet".into())
4458 }
4459 Err(e) => Err(SharedString::from(e)),
4460 };
4461
4462 if let Err(fetch_task_error) = result {
4463 repository
4464 .update(cx, |repository, _| {
4465 if let Some(data) = repository
4466 .initial_graph_data
4467 .get_mut(&(log_source, log_order))
4468 {
4469 data.error = Some(fetch_task_error);
4470 } else {
4471 debug_panic!(
4472 "This task would be dropped if this entry doesn't exist"
4473 );
4474 }
4475 })
4476 .ok();
4477 }
4478 });
4479
4480 InitialGitGraphData {
4481 fetch_task,
4482 error: None,
4483 commit_data: Vec::new(),
4484 commit_oid_to_index: HashMap::default(),
4485 }
4486 });
4487
4488 let max_start = initial_commit_data.commit_data.len().saturating_sub(1);
4489 let max_end = initial_commit_data.commit_data.len();
4490
4491 GraphDataResponse {
4492 commits: &initial_commit_data.commit_data
4493 [range.start.min(max_start)..range.end.min(max_end)],
4494 is_loading: !initial_commit_data.fetch_task.is_ready(),
4495 error: initial_commit_data.error.clone(),
4496 }
4497 }
4498
4499 async fn local_git_graph_data(
4500 this: WeakEntity<Self>,
4501 backend: Arc<dyn GitRepository>,
4502 log_source: LogSource,
4503 log_order: LogOrder,
4504 cx: &mut AsyncApp,
4505 ) -> Result<(), SharedString> {
4506 let (request_tx, request_rx) =
4507 smol::channel::unbounded::<Vec<Arc<InitialGraphCommitData>>>();
4508
4509 let task = cx.background_executor().spawn({
4510 let log_source = log_source.clone();
4511 async move {
4512 backend
4513 .initial_graph_data(log_source, log_order, request_tx)
4514 .await
4515 .map_err(|err| SharedString::from(err.to_string()))
4516 }
4517 });
4518
4519 let graph_data_key = (log_source, log_order);
4520
4521 while let Ok(initial_graph_commit_data) = request_rx.recv().await {
4522 this.update(cx, |repository, cx| {
4523 let graph_data = repository
4524 .initial_graph_data
4525 .entry(graph_data_key.clone())
4526 .and_modify(|graph_data| {
4527 for commit_data in initial_graph_commit_data {
4528 graph_data
4529 .commit_oid_to_index
4530 .insert(commit_data.sha, graph_data.commit_data.len());
4531 graph_data.commit_data.push(commit_data);
4532
4533 cx.emit(RepositoryEvent::GraphEvent(
4534 graph_data_key.clone(),
4535 GitGraphEvent::CountUpdated(graph_data.commit_data.len()),
4536 ));
4537 }
4538 });
4539
4540 match &graph_data {
4541 Entry::Occupied(_) => {}
4542 Entry::Vacant(_) => {
4543 debug_panic!("This task should be dropped if data doesn't exist");
4544 }
4545 }
4546 })
4547 .ok();
4548 }
4549
4550 task.await?;
4551 Ok(())
4552 }
4553
4554 pub fn fetch_commit_data(&mut self, sha: Oid, cx: &mut Context<Self>) -> &CommitDataState {
4555 if !self.commit_data.contains_key(&sha) {
4556 match &self.graph_commit_data_handler {
4557 GraphCommitHandlerState::Open(handler) => {
4558 if handler.commit_data_request.try_send(sha).is_ok() {
4559 let old_value = self.commit_data.insert(sha, CommitDataState::Loading);
4560 debug_assert!(old_value.is_none(), "We should never overwrite commit data");
4561 }
4562 }
4563 GraphCommitHandlerState::Closed => {
4564 self.open_graph_commit_data_handler(cx);
4565 }
4566 GraphCommitHandlerState::Starting => {}
4567 }
4568 }
4569
4570 self.commit_data
4571 .get(&sha)
4572 .unwrap_or(&CommitDataState::Loading)
4573 }
4574
4575 fn open_graph_commit_data_handler(&mut self, cx: &mut Context<Self>) {
4576 self.graph_commit_data_handler = GraphCommitHandlerState::Starting;
4577
4578 let state = self.repository_state.clone();
4579 let (result_tx, result_rx) = smol::channel::bounded::<(Oid, GraphCommitData)>(64);
4580 let (request_tx, request_rx) = smol::channel::unbounded::<Oid>();
4581
4582 let foreground_task = cx.spawn(async move |this, cx| {
4583 while let Ok((sha, commit_data)) = result_rx.recv().await {
4584 let result = this.update(cx, |this, cx| {
4585 let old_value = this
4586 .commit_data
4587 .insert(sha, CommitDataState::Loaded(Arc::new(commit_data)));
4588 debug_assert!(
4589 !matches!(old_value, Some(CommitDataState::Loaded(_))),
4590 "We should never overwrite commit data"
4591 );
4592
4593 cx.notify();
4594 });
4595 if result.is_err() {
4596 break;
4597 }
4598 }
4599
4600 this.update(cx, |this, _cx| {
4601 this.graph_commit_data_handler = GraphCommitHandlerState::Closed;
4602 })
4603 .ok();
4604 });
4605
4606 let request_tx_for_handler = request_tx;
4607 let background_executor = cx.background_executor().clone();
4608
4609 cx.background_spawn(async move {
4610 let backend = match state.await {
4611 Ok(RepositoryState::Local(LocalRepositoryState { backend, .. })) => backend,
4612 Ok(RepositoryState::Remote(_)) => {
4613 log::error!("commit_data_reader not supported for remote repositories");
4614 return;
4615 }
4616 Err(error) => {
4617 log::error!("failed to get repository state: {error}");
4618 return;
4619 }
4620 };
4621
4622 let reader = match backend.commit_data_reader() {
4623 Ok(reader) => reader,
4624 Err(error) => {
4625 log::error!("failed to create commit data reader: {error:?}");
4626 return;
4627 }
4628 };
4629
4630 loop {
4631 let timeout = background_executor.timer(std::time::Duration::from_secs(10));
4632
4633 futures::select_biased! {
4634 sha = futures::FutureExt::fuse(request_rx.recv()) => {
4635 let Ok(sha) = sha else {
4636 break;
4637 };
4638
4639 match reader.read(sha).await {
4640 Ok(commit_data) => {
4641 if result_tx.send((sha, commit_data)).await.is_err() {
4642 break;
4643 }
4644 }
4645 Err(error) => {
4646 log::error!("failed to read commit data for {sha}: {error:?}");
4647 }
4648 }
4649 }
4650 _ = futures::FutureExt::fuse(timeout) => {
4651 break;
4652 }
4653 }
4654 }
4655
4656 drop(result_tx);
4657 })
4658 .detach();
4659
4660 self.graph_commit_data_handler = GraphCommitHandlerState::Open(GraphCommitDataHandler {
4661 _task: foreground_task,
4662 commit_data_request: request_tx_for_handler,
4663 });
4664 }
4665
4666 fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
4667 Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
4668 }
4669
4670 fn save_buffers<'a>(
4671 &self,
4672 entries: impl IntoIterator<Item = &'a RepoPath>,
4673 cx: &mut Context<Self>,
4674 ) -> Vec<Task<anyhow::Result<()>>> {
4675 let mut save_futures = Vec::new();
4676 if let Some(buffer_store) = self.buffer_store(cx) {
4677 buffer_store.update(cx, |buffer_store, cx| {
4678 for path in entries {
4679 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
4680 continue;
4681 };
4682 if let Some(buffer) = buffer_store.get_by_path(&project_path)
4683 && buffer
4684 .read(cx)
4685 .file()
4686 .is_some_and(|file| file.disk_state().exists())
4687 && buffer.read(cx).has_unsaved_edits()
4688 {
4689 save_futures.push(buffer_store.save_buffer(buffer, cx));
4690 }
4691 }
4692 })
4693 }
4694 save_futures
4695 }
4696
4697 pub fn stage_entries(
4698 &mut self,
4699 entries: Vec<RepoPath>,
4700 cx: &mut Context<Self>,
4701 ) -> Task<anyhow::Result<()>> {
4702 self.stage_or_unstage_entries(true, entries, cx)
4703 }
4704
4705 pub fn unstage_entries(
4706 &mut self,
4707 entries: Vec<RepoPath>,
4708 cx: &mut Context<Self>,
4709 ) -> Task<anyhow::Result<()>> {
4710 self.stage_or_unstage_entries(false, entries, cx)
4711 }
4712
4713 fn stage_or_unstage_entries(
4714 &mut self,
4715 stage: bool,
4716 entries: Vec<RepoPath>,
4717 cx: &mut Context<Self>,
4718 ) -> Task<anyhow::Result<()>> {
4719 if entries.is_empty() {
4720 return Task::ready(Ok(()));
4721 }
4722 let Some(git_store) = self.git_store.upgrade() else {
4723 return Task::ready(Ok(()));
4724 };
4725 let id = self.id;
4726 let save_tasks = self.save_buffers(&entries, cx);
4727 let paths = entries
4728 .iter()
4729 .map(|p| p.as_unix_str())
4730 .collect::<Vec<_>>()
4731 .join(" ");
4732 let status = if stage {
4733 format!("git add {paths}")
4734 } else {
4735 format!("git reset {paths}")
4736 };
4737 let job_key = GitJobKey::WriteIndex(entries.clone());
4738
4739 self.spawn_job_with_tracking(
4740 entries.clone(),
4741 if stage {
4742 pending_op::GitStatus::Staged
4743 } else {
4744 pending_op::GitStatus::Unstaged
4745 },
4746 cx,
4747 async move |this, cx| {
4748 for save_task in save_tasks {
4749 save_task.await?;
4750 }
4751
4752 this.update(cx, |this, cx| {
4753 let weak_this = cx.weak_entity();
4754 this.send_keyed_job(
4755 Some(job_key),
4756 Some(status.into()),
4757 move |git_repo, mut cx| async move {
4758 let hunk_staging_operation_counts = weak_this
4759 .update(&mut cx, |this, cx| {
4760 let mut hunk_staging_operation_counts = HashMap::default();
4761 for path in &entries {
4762 let Some(project_path) =
4763 this.repo_path_to_project_path(path, cx)
4764 else {
4765 continue;
4766 };
4767 let Some(buffer) = git_store
4768 .read(cx)
4769 .buffer_store
4770 .read(cx)
4771 .get_by_path(&project_path)
4772 else {
4773 continue;
4774 };
4775 let Some(diff_state) = git_store
4776 .read(cx)
4777 .diffs
4778 .get(&buffer.read(cx).remote_id())
4779 .cloned()
4780 else {
4781 continue;
4782 };
4783 let Some(uncommitted_diff) =
4784 diff_state.read(cx).uncommitted_diff.as_ref().and_then(
4785 |uncommitted_diff| uncommitted_diff.upgrade(),
4786 )
4787 else {
4788 continue;
4789 };
4790 let buffer_snapshot = buffer.read(cx).text_snapshot();
4791 let file_exists = buffer
4792 .read(cx)
4793 .file()
4794 .is_some_and(|file| file.disk_state().exists());
4795 let hunk_staging_operation_count =
4796 diff_state.update(cx, |diff_state, cx| {
4797 uncommitted_diff.update(
4798 cx,
4799 |uncommitted_diff, cx| {
4800 uncommitted_diff
4801 .stage_or_unstage_all_hunks(
4802 stage,
4803 &buffer_snapshot,
4804 file_exists,
4805 cx,
4806 );
4807 },
4808 );
4809
4810 diff_state.hunk_staging_operation_count += 1;
4811 diff_state.hunk_staging_operation_count
4812 });
4813 hunk_staging_operation_counts.insert(
4814 diff_state.downgrade(),
4815 hunk_staging_operation_count,
4816 );
4817 }
4818 hunk_staging_operation_counts
4819 })
4820 .unwrap_or_default();
4821
4822 let result = match git_repo {
4823 RepositoryState::Local(LocalRepositoryState {
4824 backend,
4825 environment,
4826 ..
4827 }) => {
4828 if stage {
4829 backend.stage_paths(entries, environment.clone()).await
4830 } else {
4831 backend.unstage_paths(entries, environment.clone()).await
4832 }
4833 }
4834 RepositoryState::Remote(RemoteRepositoryState {
4835 project_id,
4836 client,
4837 }) => {
4838 if stage {
4839 client
4840 .request(proto::Stage {
4841 project_id: project_id.0,
4842 repository_id: id.to_proto(),
4843 paths: entries
4844 .into_iter()
4845 .map(|repo_path| repo_path.to_proto())
4846 .collect(),
4847 })
4848 .await
4849 .context("sending stage request")
4850 .map(|_| ())
4851 } else {
4852 client
4853 .request(proto::Unstage {
4854 project_id: project_id.0,
4855 repository_id: id.to_proto(),
4856 paths: entries
4857 .into_iter()
4858 .map(|repo_path| repo_path.to_proto())
4859 .collect(),
4860 })
4861 .await
4862 .context("sending unstage request")
4863 .map(|_| ())
4864 }
4865 }
4866 };
4867
4868 for (diff_state, hunk_staging_operation_count) in
4869 hunk_staging_operation_counts
4870 {
4871 diff_state
4872 .update(&mut cx, |diff_state, cx| {
4873 if result.is_ok() {
4874 diff_state.hunk_staging_operation_count_as_of_write =
4875 hunk_staging_operation_count;
4876 } else if let Some(uncommitted_diff) =
4877 &diff_state.uncommitted_diff
4878 {
4879 uncommitted_diff
4880 .update(cx, |uncommitted_diff, cx| {
4881 uncommitted_diff.clear_pending_hunks(cx);
4882 })
4883 .ok();
4884 }
4885 })
4886 .ok();
4887 }
4888
4889 result
4890 },
4891 )
4892 })?
4893 .await?
4894 },
4895 )
4896 }
4897
4898 pub fn stage_all(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
4899 let to_stage = self
4900 .cached_status()
4901 .filter_map(|entry| {
4902 if let Some(ops) = self.pending_ops_for_path(&entry.repo_path) {
4903 if ops.staging() || ops.staged() {
4904 None
4905 } else {
4906 Some(entry.repo_path)
4907 }
4908 } else if entry.status.staging().is_fully_staged() {
4909 None
4910 } else {
4911 Some(entry.repo_path)
4912 }
4913 })
4914 .collect();
4915 self.stage_or_unstage_entries(true, to_stage, cx)
4916 }
4917
4918 pub fn unstage_all(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
4919 let to_unstage = self
4920 .cached_status()
4921 .filter_map(|entry| {
4922 if let Some(ops) = self.pending_ops_for_path(&entry.repo_path) {
4923 if !ops.staging() && !ops.staged() {
4924 None
4925 } else {
4926 Some(entry.repo_path)
4927 }
4928 } else if entry.status.staging().is_fully_unstaged() {
4929 None
4930 } else {
4931 Some(entry.repo_path)
4932 }
4933 })
4934 .collect();
4935 self.stage_or_unstage_entries(false, to_unstage, cx)
4936 }
4937
4938 pub fn stash_all(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
4939 let to_stash = self.cached_status().map(|entry| entry.repo_path).collect();
4940
4941 self.stash_entries(to_stash, cx)
4942 }
4943
4944 pub fn stash_entries(
4945 &mut self,
4946 entries: Vec<RepoPath>,
4947 cx: &mut Context<Self>,
4948 ) -> Task<anyhow::Result<()>> {
4949 let id = self.id;
4950
4951 cx.spawn(async move |this, cx| {
4952 this.update(cx, |this, _| {
4953 this.send_job(None, move |git_repo, _cx| async move {
4954 match git_repo {
4955 RepositoryState::Local(LocalRepositoryState {
4956 backend,
4957 environment,
4958 ..
4959 }) => backend.stash_paths(entries, environment).await,
4960 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4961 client
4962 .request(proto::Stash {
4963 project_id: project_id.0,
4964 repository_id: id.to_proto(),
4965 paths: entries
4966 .into_iter()
4967 .map(|repo_path| repo_path.to_proto())
4968 .collect(),
4969 })
4970 .await
4971 .context("sending stash request")?;
4972 Ok(())
4973 }
4974 }
4975 })
4976 })?
4977 .await??;
4978 Ok(())
4979 })
4980 }
4981
4982 pub fn stash_pop(
4983 &mut self,
4984 index: Option<usize>,
4985 cx: &mut Context<Self>,
4986 ) -> Task<anyhow::Result<()>> {
4987 let id = self.id;
4988 cx.spawn(async move |this, cx| {
4989 this.update(cx, |this, _| {
4990 this.send_job(None, move |git_repo, _cx| async move {
4991 match git_repo {
4992 RepositoryState::Local(LocalRepositoryState {
4993 backend,
4994 environment,
4995 ..
4996 }) => backend.stash_pop(index, environment).await,
4997 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4998 client
4999 .request(proto::StashPop {
5000 project_id: project_id.0,
5001 repository_id: id.to_proto(),
5002 stash_index: index.map(|i| i as u64),
5003 })
5004 .await
5005 .context("sending stash pop request")?;
5006 Ok(())
5007 }
5008 }
5009 })
5010 })?
5011 .await??;
5012 Ok(())
5013 })
5014 }
5015
5016 pub fn stash_apply(
5017 &mut self,
5018 index: Option<usize>,
5019 cx: &mut Context<Self>,
5020 ) -> Task<anyhow::Result<()>> {
5021 let id = self.id;
5022 cx.spawn(async move |this, cx| {
5023 this.update(cx, |this, _| {
5024 this.send_job(None, move |git_repo, _cx| async move {
5025 match git_repo {
5026 RepositoryState::Local(LocalRepositoryState {
5027 backend,
5028 environment,
5029 ..
5030 }) => backend.stash_apply(index, environment).await,
5031 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5032 client
5033 .request(proto::StashApply {
5034 project_id: project_id.0,
5035 repository_id: id.to_proto(),
5036 stash_index: index.map(|i| i as u64),
5037 })
5038 .await
5039 .context("sending stash apply request")?;
5040 Ok(())
5041 }
5042 }
5043 })
5044 })?
5045 .await??;
5046 Ok(())
5047 })
5048 }
5049
5050 pub fn stash_drop(
5051 &mut self,
5052 index: Option<usize>,
5053 cx: &mut Context<Self>,
5054 ) -> oneshot::Receiver<anyhow::Result<()>> {
5055 let id = self.id;
5056 let updates_tx = self
5057 .git_store()
5058 .and_then(|git_store| match &git_store.read(cx).state {
5059 GitStoreState::Local { downstream, .. } => downstream
5060 .as_ref()
5061 .map(|downstream| downstream.updates_tx.clone()),
5062 _ => None,
5063 });
5064 let this = cx.weak_entity();
5065 self.send_job(None, move |git_repo, mut cx| async move {
5066 match git_repo {
5067 RepositoryState::Local(LocalRepositoryState {
5068 backend,
5069 environment,
5070 ..
5071 }) => {
5072 // TODO would be nice to not have to do this manually
5073 let result = backend.stash_drop(index, environment).await;
5074 if result.is_ok()
5075 && let Ok(stash_entries) = backend.stash_entries().await
5076 {
5077 let snapshot = this.update(&mut cx, |this, cx| {
5078 this.snapshot.stash_entries = stash_entries;
5079 cx.emit(RepositoryEvent::StashEntriesChanged);
5080 this.snapshot.clone()
5081 })?;
5082 if let Some(updates_tx) = updates_tx {
5083 updates_tx
5084 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
5085 .ok();
5086 }
5087 }
5088
5089 result
5090 }
5091 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5092 client
5093 .request(proto::StashDrop {
5094 project_id: project_id.0,
5095 repository_id: id.to_proto(),
5096 stash_index: index.map(|i| i as u64),
5097 })
5098 .await
5099 .context("sending stash pop request")?;
5100 Ok(())
5101 }
5102 }
5103 })
5104 }
5105
5106 pub fn run_hook(&mut self, hook: RunHook, _cx: &mut App) -> oneshot::Receiver<Result<()>> {
5107 let id = self.id;
5108 self.send_job(
5109 Some(format!("git hook {}", hook.as_str()).into()),
5110 move |git_repo, _cx| async move {
5111 match git_repo {
5112 RepositoryState::Local(LocalRepositoryState {
5113 backend,
5114 environment,
5115 ..
5116 }) => backend.run_hook(hook, environment.clone()).await,
5117 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5118 client
5119 .request(proto::RunGitHook {
5120 project_id: project_id.0,
5121 repository_id: id.to_proto(),
5122 hook: hook.to_proto(),
5123 })
5124 .await?;
5125
5126 Ok(())
5127 }
5128 }
5129 },
5130 )
5131 }
5132
5133 pub fn commit(
5134 &mut self,
5135 message: SharedString,
5136 name_and_email: Option<(SharedString, SharedString)>,
5137 options: CommitOptions,
5138 askpass: AskPassDelegate,
5139 cx: &mut App,
5140 ) -> oneshot::Receiver<Result<()>> {
5141 let id = self.id;
5142 let askpass_delegates = self.askpass_delegates.clone();
5143 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
5144
5145 let rx = self.run_hook(RunHook::PreCommit, cx);
5146
5147 self.send_job(Some("git commit".into()), move |git_repo, _cx| async move {
5148 rx.await??;
5149
5150 match git_repo {
5151 RepositoryState::Local(LocalRepositoryState {
5152 backend,
5153 environment,
5154 ..
5155 }) => {
5156 backend
5157 .commit(message, name_and_email, options, askpass, environment)
5158 .await
5159 }
5160 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5161 askpass_delegates.lock().insert(askpass_id, askpass);
5162 let _defer = util::defer(|| {
5163 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
5164 debug_assert!(askpass_delegate.is_some());
5165 });
5166 let (name, email) = name_and_email.unzip();
5167 client
5168 .request(proto::Commit {
5169 project_id: project_id.0,
5170 repository_id: id.to_proto(),
5171 message: String::from(message),
5172 name: name.map(String::from),
5173 email: email.map(String::from),
5174 options: Some(proto::commit::CommitOptions {
5175 amend: options.amend,
5176 signoff: options.signoff,
5177 }),
5178 askpass_id,
5179 })
5180 .await
5181 .context("sending commit request")?;
5182
5183 Ok(())
5184 }
5185 }
5186 })
5187 }
5188
5189 pub fn fetch(
5190 &mut self,
5191 fetch_options: FetchOptions,
5192 askpass: AskPassDelegate,
5193 _cx: &mut App,
5194 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
5195 let askpass_delegates = self.askpass_delegates.clone();
5196 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
5197 let id = self.id;
5198
5199 self.send_job(Some("git fetch".into()), move |git_repo, cx| async move {
5200 match git_repo {
5201 RepositoryState::Local(LocalRepositoryState {
5202 backend,
5203 environment,
5204 ..
5205 }) => backend.fetch(fetch_options, askpass, environment, cx).await,
5206 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5207 askpass_delegates.lock().insert(askpass_id, askpass);
5208 let _defer = util::defer(|| {
5209 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
5210 debug_assert!(askpass_delegate.is_some());
5211 });
5212
5213 let response = client
5214 .request(proto::Fetch {
5215 project_id: project_id.0,
5216 repository_id: id.to_proto(),
5217 askpass_id,
5218 remote: fetch_options.to_proto(),
5219 })
5220 .await
5221 .context("sending fetch request")?;
5222
5223 Ok(RemoteCommandOutput {
5224 stdout: response.stdout,
5225 stderr: response.stderr,
5226 })
5227 }
5228 }
5229 })
5230 }
5231
5232 pub fn push(
5233 &mut self,
5234 branch: SharedString,
5235 remote_branch: SharedString,
5236 remote: SharedString,
5237 options: Option<PushOptions>,
5238 askpass: AskPassDelegate,
5239 cx: &mut Context<Self>,
5240 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
5241 let askpass_delegates = self.askpass_delegates.clone();
5242 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
5243 let id = self.id;
5244
5245 let args = options
5246 .map(|option| match option {
5247 PushOptions::SetUpstream => " --set-upstream",
5248 PushOptions::Force => " --force-with-lease",
5249 })
5250 .unwrap_or("");
5251
5252 let updates_tx = self
5253 .git_store()
5254 .and_then(|git_store| match &git_store.read(cx).state {
5255 GitStoreState::Local { downstream, .. } => downstream
5256 .as_ref()
5257 .map(|downstream| downstream.updates_tx.clone()),
5258 _ => None,
5259 });
5260
5261 let this = cx.weak_entity();
5262 self.send_job(
5263 Some(format!("git push {} {} {}:{}", args, remote, branch, remote_branch).into()),
5264 move |git_repo, mut cx| async move {
5265 match git_repo {
5266 RepositoryState::Local(LocalRepositoryState {
5267 backend,
5268 environment,
5269 ..
5270 }) => {
5271 let result = backend
5272 .push(
5273 branch.to_string(),
5274 remote_branch.to_string(),
5275 remote.to_string(),
5276 options,
5277 askpass,
5278 environment.clone(),
5279 cx.clone(),
5280 )
5281 .await;
5282 // TODO would be nice to not have to do this manually
5283 if result.is_ok() {
5284 let branches = backend.branches().await?;
5285 let branch = branches.into_iter().find(|branch| branch.is_head);
5286 log::info!("head branch after scan is {branch:?}");
5287 let snapshot = this.update(&mut cx, |this, cx| {
5288 this.snapshot.branch = branch;
5289 cx.emit(RepositoryEvent::BranchChanged);
5290 this.snapshot.clone()
5291 })?;
5292 if let Some(updates_tx) = updates_tx {
5293 updates_tx
5294 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
5295 .ok();
5296 }
5297 }
5298 result
5299 }
5300 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5301 askpass_delegates.lock().insert(askpass_id, askpass);
5302 let _defer = util::defer(|| {
5303 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
5304 debug_assert!(askpass_delegate.is_some());
5305 });
5306 let response = client
5307 .request(proto::Push {
5308 project_id: project_id.0,
5309 repository_id: id.to_proto(),
5310 askpass_id,
5311 branch_name: branch.to_string(),
5312 remote_branch_name: remote_branch.to_string(),
5313 remote_name: remote.to_string(),
5314 options: options.map(|options| match options {
5315 PushOptions::Force => proto::push::PushOptions::Force,
5316 PushOptions::SetUpstream => {
5317 proto::push::PushOptions::SetUpstream
5318 }
5319 }
5320 as i32),
5321 })
5322 .await
5323 .context("sending push request")?;
5324
5325 Ok(RemoteCommandOutput {
5326 stdout: response.stdout,
5327 stderr: response.stderr,
5328 })
5329 }
5330 }
5331 },
5332 )
5333 }
5334
5335 pub fn pull(
5336 &mut self,
5337 branch: Option<SharedString>,
5338 remote: SharedString,
5339 rebase: bool,
5340 askpass: AskPassDelegate,
5341 _cx: &mut App,
5342 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
5343 let askpass_delegates = self.askpass_delegates.clone();
5344 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
5345 let id = self.id;
5346
5347 let mut status = "git pull".to_string();
5348 if rebase {
5349 status.push_str(" --rebase");
5350 }
5351 status.push_str(&format!(" {}", remote));
5352 if let Some(b) = &branch {
5353 status.push_str(&format!(" {}", b));
5354 }
5355
5356 self.send_job(Some(status.into()), move |git_repo, cx| async move {
5357 match git_repo {
5358 RepositoryState::Local(LocalRepositoryState {
5359 backend,
5360 environment,
5361 ..
5362 }) => {
5363 backend
5364 .pull(
5365 branch.as_ref().map(|b| b.to_string()),
5366 remote.to_string(),
5367 rebase,
5368 askpass,
5369 environment.clone(),
5370 cx,
5371 )
5372 .await
5373 }
5374 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5375 askpass_delegates.lock().insert(askpass_id, askpass);
5376 let _defer = util::defer(|| {
5377 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
5378 debug_assert!(askpass_delegate.is_some());
5379 });
5380 let response = client
5381 .request(proto::Pull {
5382 project_id: project_id.0,
5383 repository_id: id.to_proto(),
5384 askpass_id,
5385 rebase,
5386 branch_name: branch.as_ref().map(|b| b.to_string()),
5387 remote_name: remote.to_string(),
5388 })
5389 .await
5390 .context("sending pull request")?;
5391
5392 Ok(RemoteCommandOutput {
5393 stdout: response.stdout,
5394 stderr: response.stderr,
5395 })
5396 }
5397 }
5398 })
5399 }
5400
5401 fn spawn_set_index_text_job(
5402 &mut self,
5403 path: RepoPath,
5404 content: Option<String>,
5405 hunk_staging_operation_count: Option<usize>,
5406 cx: &mut Context<Self>,
5407 ) -> oneshot::Receiver<anyhow::Result<()>> {
5408 let id = self.id;
5409 let this = cx.weak_entity();
5410 let git_store = self.git_store.clone();
5411 let abs_path = self.snapshot.repo_path_to_abs_path(&path);
5412 self.send_keyed_job(
5413 Some(GitJobKey::WriteIndex(vec![path.clone()])),
5414 None,
5415 move |git_repo, mut cx| async move {
5416 log::debug!(
5417 "start updating index text for buffer {}",
5418 path.as_unix_str()
5419 );
5420
5421 match git_repo {
5422 RepositoryState::Local(LocalRepositoryState {
5423 fs,
5424 backend,
5425 environment,
5426 ..
5427 }) => {
5428 let executable = match fs.metadata(&abs_path).await {
5429 Ok(Some(meta)) => meta.is_executable,
5430 Ok(None) => false,
5431 Err(_err) => false,
5432 };
5433 backend
5434 .set_index_text(path.clone(), content, environment.clone(), executable)
5435 .await?;
5436 }
5437 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5438 client
5439 .request(proto::SetIndexText {
5440 project_id: project_id.0,
5441 repository_id: id.to_proto(),
5442 path: path.to_proto(),
5443 text: content,
5444 })
5445 .await?;
5446 }
5447 }
5448 log::debug!(
5449 "finish updating index text for buffer {}",
5450 path.as_unix_str()
5451 );
5452
5453 if let Some(hunk_staging_operation_count) = hunk_staging_operation_count {
5454 let project_path = this
5455 .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
5456 .ok()
5457 .flatten();
5458 git_store
5459 .update(&mut cx, |git_store, cx| {
5460 let buffer_id = git_store
5461 .buffer_store
5462 .read(cx)
5463 .get_by_path(&project_path?)?
5464 .read(cx)
5465 .remote_id();
5466 let diff_state = git_store.diffs.get(&buffer_id)?;
5467 diff_state.update(cx, |diff_state, _| {
5468 diff_state.hunk_staging_operation_count_as_of_write =
5469 hunk_staging_operation_count;
5470 });
5471 Some(())
5472 })
5473 .context("Git store dropped")?;
5474 }
5475 Ok(())
5476 },
5477 )
5478 }
5479
5480 pub fn create_remote(
5481 &mut self,
5482 remote_name: String,
5483 remote_url: String,
5484 ) -> oneshot::Receiver<Result<()>> {
5485 let id = self.id;
5486 self.send_job(
5487 Some(format!("git remote add {remote_name} {remote_url}").into()),
5488 move |repo, _cx| async move {
5489 match repo {
5490 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5491 backend.create_remote(remote_name, remote_url).await
5492 }
5493 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5494 client
5495 .request(proto::GitCreateRemote {
5496 project_id: project_id.0,
5497 repository_id: id.to_proto(),
5498 remote_name,
5499 remote_url,
5500 })
5501 .await?;
5502
5503 Ok(())
5504 }
5505 }
5506 },
5507 )
5508 }
5509
5510 pub fn remove_remote(&mut self, remote_name: String) -> oneshot::Receiver<Result<()>> {
5511 let id = self.id;
5512 self.send_job(
5513 Some(format!("git remove remote {remote_name}").into()),
5514 move |repo, _cx| async move {
5515 match repo {
5516 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5517 backend.remove_remote(remote_name).await
5518 }
5519 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5520 client
5521 .request(proto::GitRemoveRemote {
5522 project_id: project_id.0,
5523 repository_id: id.to_proto(),
5524 remote_name,
5525 })
5526 .await?;
5527
5528 Ok(())
5529 }
5530 }
5531 },
5532 )
5533 }
5534
5535 pub fn get_remotes(
5536 &mut self,
5537 branch_name: Option<String>,
5538 is_push: bool,
5539 ) -> oneshot::Receiver<Result<Vec<Remote>>> {
5540 let id = self.id;
5541 self.send_job(None, move |repo, _cx| async move {
5542 match repo {
5543 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5544 let remote = if let Some(branch_name) = branch_name {
5545 if is_push {
5546 backend.get_push_remote(branch_name).await?
5547 } else {
5548 backend.get_branch_remote(branch_name).await?
5549 }
5550 } else {
5551 None
5552 };
5553
5554 match remote {
5555 Some(remote) => Ok(vec![remote]),
5556 None => backend.get_all_remotes().await,
5557 }
5558 }
5559 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5560 let response = client
5561 .request(proto::GetRemotes {
5562 project_id: project_id.0,
5563 repository_id: id.to_proto(),
5564 branch_name,
5565 is_push,
5566 })
5567 .await?;
5568
5569 let remotes = response
5570 .remotes
5571 .into_iter()
5572 .map(|remotes| Remote {
5573 name: remotes.name.into(),
5574 })
5575 .collect();
5576
5577 Ok(remotes)
5578 }
5579 }
5580 })
5581 }
5582
5583 pub fn branches(&mut self) -> oneshot::Receiver<Result<Vec<Branch>>> {
5584 let id = self.id;
5585 self.send_job(None, move |repo, _| async move {
5586 match repo {
5587 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5588 backend.branches().await
5589 }
5590 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5591 let response = client
5592 .request(proto::GitGetBranches {
5593 project_id: project_id.0,
5594 repository_id: id.to_proto(),
5595 })
5596 .await?;
5597
5598 let branches = response
5599 .branches
5600 .into_iter()
5601 .map(|branch| proto_to_branch(&branch))
5602 .collect();
5603
5604 Ok(branches)
5605 }
5606 }
5607 })
5608 }
5609
5610 pub fn worktrees(&mut self) -> oneshot::Receiver<Result<Vec<GitWorktree>>> {
5611 let id = self.id;
5612 self.send_job(None, move |repo, _| async move {
5613 match repo {
5614 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5615 backend.worktrees().await
5616 }
5617 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5618 let response = client
5619 .request(proto::GitGetWorktrees {
5620 project_id: project_id.0,
5621 repository_id: id.to_proto(),
5622 })
5623 .await?;
5624
5625 let worktrees = response
5626 .worktrees
5627 .into_iter()
5628 .map(|worktree| proto_to_worktree(&worktree))
5629 .collect();
5630
5631 Ok(worktrees)
5632 }
5633 }
5634 })
5635 }
5636
5637 pub fn create_worktree(
5638 &mut self,
5639 name: String,
5640 directory: PathBuf,
5641 commit: Option<String>,
5642 ) -> oneshot::Receiver<Result<()>> {
5643 let id = self.id;
5644 self.send_job(
5645 Some("git worktree add".into()),
5646 move |repo, _cx| async move {
5647 match repo {
5648 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5649 backend.create_worktree(name, directory, commit).await
5650 }
5651 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5652 client
5653 .request(proto::GitCreateWorktree {
5654 project_id: project_id.0,
5655 repository_id: id.to_proto(),
5656 name,
5657 directory: directory.to_string_lossy().to_string(),
5658 commit,
5659 })
5660 .await?;
5661
5662 Ok(())
5663 }
5664 }
5665 },
5666 )
5667 }
5668
5669 pub fn default_branch(
5670 &mut self,
5671 include_remote_name: bool,
5672 ) -> oneshot::Receiver<Result<Option<SharedString>>> {
5673 let id = self.id;
5674 self.send_job(None, move |repo, _| async move {
5675 match repo {
5676 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5677 backend.default_branch(include_remote_name).await
5678 }
5679 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5680 let response = client
5681 .request(proto::GetDefaultBranch {
5682 project_id: project_id.0,
5683 repository_id: id.to_proto(),
5684 })
5685 .await?;
5686
5687 anyhow::Ok(response.branch.map(SharedString::from))
5688 }
5689 }
5690 })
5691 }
5692
5693 pub fn diff_tree(
5694 &mut self,
5695 diff_type: DiffTreeType,
5696 _cx: &App,
5697 ) -> oneshot::Receiver<Result<TreeDiff>> {
5698 let repository_id = self.snapshot.id;
5699 self.send_job(None, move |repo, _cx| async move {
5700 match repo {
5701 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5702 backend.diff_tree(diff_type).await
5703 }
5704 RepositoryState::Remote(RemoteRepositoryState { client, project_id }) => {
5705 let response = client
5706 .request(proto::GetTreeDiff {
5707 project_id: project_id.0,
5708 repository_id: repository_id.0,
5709 is_merge: matches!(diff_type, DiffTreeType::MergeBase { .. }),
5710 base: diff_type.base().to_string(),
5711 head: diff_type.head().to_string(),
5712 })
5713 .await?;
5714
5715 let entries = response
5716 .entries
5717 .into_iter()
5718 .filter_map(|entry| {
5719 let status = match entry.status() {
5720 proto::tree_diff_status::Status::Added => TreeDiffStatus::Added,
5721 proto::tree_diff_status::Status::Modified => {
5722 TreeDiffStatus::Modified {
5723 old: git::Oid::from_str(
5724 &entry.oid.context("missing oid").log_err()?,
5725 )
5726 .log_err()?,
5727 }
5728 }
5729 proto::tree_diff_status::Status::Deleted => {
5730 TreeDiffStatus::Deleted {
5731 old: git::Oid::from_str(
5732 &entry.oid.context("missing oid").log_err()?,
5733 )
5734 .log_err()?,
5735 }
5736 }
5737 };
5738 Some((
5739 RepoPath::from_rel_path(
5740 &RelPath::from_proto(&entry.path).log_err()?,
5741 ),
5742 status,
5743 ))
5744 })
5745 .collect();
5746
5747 Ok(TreeDiff { entries })
5748 }
5749 }
5750 })
5751 }
5752
5753 pub fn diff(&mut self, diff_type: DiffType, _cx: &App) -> oneshot::Receiver<Result<String>> {
5754 let id = self.id;
5755 self.send_job(None, move |repo, _cx| async move {
5756 match repo {
5757 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5758 backend.diff(diff_type).await
5759 }
5760 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5761 let (proto_diff_type, merge_base_ref) = match &diff_type {
5762 DiffType::HeadToIndex => {
5763 (proto::git_diff::DiffType::HeadToIndex.into(), None)
5764 }
5765 DiffType::HeadToWorktree => {
5766 (proto::git_diff::DiffType::HeadToWorktree.into(), None)
5767 }
5768 DiffType::MergeBase { base_ref } => (
5769 proto::git_diff::DiffType::MergeBase.into(),
5770 Some(base_ref.to_string()),
5771 ),
5772 };
5773 let response = client
5774 .request(proto::GitDiff {
5775 project_id: project_id.0,
5776 repository_id: id.to_proto(),
5777 diff_type: proto_diff_type,
5778 merge_base_ref,
5779 })
5780 .await?;
5781
5782 Ok(response.diff)
5783 }
5784 }
5785 })
5786 }
5787
5788 /// Fetches per-line diff statistics (additions/deletions) via `git diff --numstat`.
5789 pub fn diff_stat(
5790 &mut self,
5791 diff_type: DiffType,
5792 _cx: &App,
5793 ) -> oneshot::Receiver<
5794 Result<collections::HashMap<git::repository::RepoPath, git::status::DiffStat>>,
5795 > {
5796 let id = self.id;
5797 self.send_job(None, move |repo, _cx| async move {
5798 match repo {
5799 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5800 backend.diff_stat(diff_type).await
5801 }
5802 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5803 let (proto_diff_type, merge_base_ref) = match &diff_type {
5804 DiffType::HeadToIndex => {
5805 (proto::git_diff_stat::DiffType::HeadToIndex.into(), None)
5806 }
5807 DiffType::HeadToWorktree => {
5808 (proto::git_diff_stat::DiffType::HeadToWorktree.into(), None)
5809 }
5810 DiffType::MergeBase { base_ref } => (
5811 proto::git_diff_stat::DiffType::MergeBase.into(),
5812 Some(base_ref.to_string()),
5813 ),
5814 };
5815 let response = client
5816 .request(proto::GitDiffStat {
5817 project_id: project_id.0,
5818 repository_id: id.to_proto(),
5819 diff_type: proto_diff_type,
5820 merge_base_ref,
5821 })
5822 .await?;
5823
5824 let stats = response
5825 .entries
5826 .into_iter()
5827 .filter_map(|entry| {
5828 let path = RepoPath::from_proto(&entry.path).log_err()?;
5829 Some((
5830 path,
5831 git::status::DiffStat {
5832 added: entry.added,
5833 deleted: entry.deleted,
5834 },
5835 ))
5836 })
5837 .collect();
5838
5839 Ok(stats)
5840 }
5841 }
5842 })
5843 }
5844
5845 pub fn create_branch(
5846 &mut self,
5847 branch_name: String,
5848 base_branch: Option<String>,
5849 ) -> oneshot::Receiver<Result<()>> {
5850 let id = self.id;
5851 let status_msg = if let Some(ref base) = base_branch {
5852 format!("git switch -c {branch_name} {base}").into()
5853 } else {
5854 format!("git switch -c {branch_name}").into()
5855 };
5856 self.send_job(Some(status_msg), move |repo, _cx| async move {
5857 match repo {
5858 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5859 backend.create_branch(branch_name, base_branch).await
5860 }
5861 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5862 client
5863 .request(proto::GitCreateBranch {
5864 project_id: project_id.0,
5865 repository_id: id.to_proto(),
5866 branch_name,
5867 })
5868 .await?;
5869
5870 Ok(())
5871 }
5872 }
5873 })
5874 }
5875
5876 pub fn change_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
5877 let id = self.id;
5878 self.send_job(
5879 Some(format!("git switch {branch_name}").into()),
5880 move |repo, _cx| async move {
5881 match repo {
5882 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5883 backend.change_branch(branch_name).await
5884 }
5885 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5886 client
5887 .request(proto::GitChangeBranch {
5888 project_id: project_id.0,
5889 repository_id: id.to_proto(),
5890 branch_name,
5891 })
5892 .await?;
5893
5894 Ok(())
5895 }
5896 }
5897 },
5898 )
5899 }
5900
5901 pub fn delete_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
5902 let id = self.id;
5903 self.send_job(
5904 Some(format!("git branch -d {branch_name}").into()),
5905 move |repo, _cx| async move {
5906 match repo {
5907 RepositoryState::Local(state) => state.backend.delete_branch(branch_name).await,
5908 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5909 client
5910 .request(proto::GitDeleteBranch {
5911 project_id: project_id.0,
5912 repository_id: id.to_proto(),
5913 branch_name,
5914 })
5915 .await?;
5916
5917 Ok(())
5918 }
5919 }
5920 },
5921 )
5922 }
5923
5924 pub fn rename_branch(
5925 &mut self,
5926 branch: String,
5927 new_name: String,
5928 ) -> oneshot::Receiver<Result<()>> {
5929 let id = self.id;
5930 self.send_job(
5931 Some(format!("git branch -m {branch} {new_name}").into()),
5932 move |repo, _cx| async move {
5933 match repo {
5934 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5935 backend.rename_branch(branch, new_name).await
5936 }
5937 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5938 client
5939 .request(proto::GitRenameBranch {
5940 project_id: project_id.0,
5941 repository_id: id.to_proto(),
5942 branch,
5943 new_name,
5944 })
5945 .await?;
5946
5947 Ok(())
5948 }
5949 }
5950 },
5951 )
5952 }
5953
5954 pub fn check_for_pushed_commits(&mut self) -> oneshot::Receiver<Result<Vec<SharedString>>> {
5955 let id = self.id;
5956 self.send_job(None, move |repo, _cx| async move {
5957 match repo {
5958 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5959 backend.check_for_pushed_commit().await
5960 }
5961 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5962 let response = client
5963 .request(proto::CheckForPushedCommits {
5964 project_id: project_id.0,
5965 repository_id: id.to_proto(),
5966 })
5967 .await?;
5968
5969 let branches = response.pushed_to.into_iter().map(Into::into).collect();
5970
5971 Ok(branches)
5972 }
5973 }
5974 })
5975 }
5976
5977 pub fn checkpoint(&mut self) -> oneshot::Receiver<Result<GitRepositoryCheckpoint>> {
5978 self.send_job(None, |repo, _cx| async move {
5979 match repo {
5980 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5981 backend.checkpoint().await
5982 }
5983 RepositoryState::Remote(..) => anyhow::bail!("not implemented yet"),
5984 }
5985 })
5986 }
5987
5988 pub fn restore_checkpoint(
5989 &mut self,
5990 checkpoint: GitRepositoryCheckpoint,
5991 ) -> oneshot::Receiver<Result<()>> {
5992 self.send_job(None, move |repo, _cx| async move {
5993 match repo {
5994 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5995 backend.restore_checkpoint(checkpoint).await
5996 }
5997 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
5998 }
5999 })
6000 }
6001
6002 pub(crate) fn apply_remote_update(
6003 &mut self,
6004 update: proto::UpdateRepository,
6005 cx: &mut Context<Self>,
6006 ) -> Result<()> {
6007 let conflicted_paths = TreeSet::from_ordered_entries(
6008 update
6009 .current_merge_conflicts
6010 .into_iter()
6011 .filter_map(|path| RepoPath::from_proto(&path).log_err()),
6012 );
6013 let new_branch = update.branch_summary.as_ref().map(proto_to_branch);
6014 let new_head_commit = update
6015 .head_commit_details
6016 .as_ref()
6017 .map(proto_to_commit_details);
6018 if self.snapshot.branch != new_branch || self.snapshot.head_commit != new_head_commit {
6019 cx.emit(RepositoryEvent::BranchChanged)
6020 }
6021 self.snapshot.branch = new_branch;
6022 self.snapshot.head_commit = new_head_commit;
6023
6024 self.snapshot.merge.conflicted_paths = conflicted_paths;
6025 self.snapshot.merge.message = update.merge_message.map(SharedString::from);
6026 let new_stash_entries = GitStash {
6027 entries: update
6028 .stash_entries
6029 .iter()
6030 .filter_map(|entry| proto_to_stash(entry).ok())
6031 .collect(),
6032 };
6033 if self.snapshot.stash_entries != new_stash_entries {
6034 cx.emit(RepositoryEvent::StashEntriesChanged)
6035 }
6036 self.snapshot.stash_entries = new_stash_entries;
6037 self.snapshot.remote_upstream_url = update.remote_upstream_url;
6038 self.snapshot.remote_origin_url = update.remote_origin_url;
6039
6040 let edits = update
6041 .removed_statuses
6042 .into_iter()
6043 .filter_map(|path| {
6044 Some(sum_tree::Edit::Remove(PathKey(
6045 RelPath::from_proto(&path).log_err()?,
6046 )))
6047 })
6048 .chain(
6049 update
6050 .updated_statuses
6051 .into_iter()
6052 .filter_map(|updated_status| {
6053 Some(sum_tree::Edit::Insert(updated_status.try_into().log_err()?))
6054 }),
6055 )
6056 .collect::<Vec<_>>();
6057 if !edits.is_empty() {
6058 cx.emit(RepositoryEvent::StatusesChanged);
6059 }
6060 self.snapshot.statuses_by_path.edit(edits, ());
6061 if update.is_last_update {
6062 self.snapshot.scan_id = update.scan_id;
6063 }
6064 self.clear_pending_ops(cx);
6065 Ok(())
6066 }
6067
6068 pub fn compare_checkpoints(
6069 &mut self,
6070 left: GitRepositoryCheckpoint,
6071 right: GitRepositoryCheckpoint,
6072 ) -> oneshot::Receiver<Result<bool>> {
6073 self.send_job(None, move |repo, _cx| async move {
6074 match repo {
6075 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
6076 backend.compare_checkpoints(left, right).await
6077 }
6078 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
6079 }
6080 })
6081 }
6082
6083 pub fn diff_checkpoints(
6084 &mut self,
6085 base_checkpoint: GitRepositoryCheckpoint,
6086 target_checkpoint: GitRepositoryCheckpoint,
6087 ) -> oneshot::Receiver<Result<String>> {
6088 self.send_job(None, move |repo, _cx| async move {
6089 match repo {
6090 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
6091 backend
6092 .diff_checkpoints(base_checkpoint, target_checkpoint)
6093 .await
6094 }
6095 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
6096 }
6097 })
6098 }
6099
6100 fn clear_pending_ops(&mut self, cx: &mut Context<Self>) {
6101 let updated = SumTree::from_iter(
6102 self.pending_ops.iter().filter_map(|ops| {
6103 let inner_ops: Vec<PendingOp> =
6104 ops.ops.iter().filter(|op| op.running()).cloned().collect();
6105 if inner_ops.is_empty() {
6106 None
6107 } else {
6108 Some(PendingOps {
6109 repo_path: ops.repo_path.clone(),
6110 ops: inner_ops,
6111 })
6112 }
6113 }),
6114 (),
6115 );
6116
6117 if updated != self.pending_ops {
6118 cx.emit(RepositoryEvent::PendingOpsChanged {
6119 pending_ops: self.pending_ops.clone(),
6120 })
6121 }
6122
6123 self.pending_ops = updated;
6124 }
6125
6126 fn schedule_scan(
6127 &mut self,
6128 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
6129 cx: &mut Context<Self>,
6130 ) {
6131 let this = cx.weak_entity();
6132 let _ = self.send_keyed_job(
6133 Some(GitJobKey::ReloadGitState),
6134 None,
6135 |state, mut cx| async move {
6136 log::debug!("run scheduled git status scan");
6137
6138 let Some(this) = this.upgrade() else {
6139 return Ok(());
6140 };
6141 let RepositoryState::Local(LocalRepositoryState { backend, .. }) = state else {
6142 bail!("not a local repository")
6143 };
6144 let (snapshot, events) = this
6145 .update(&mut cx, |this, _| {
6146 this.paths_needing_status_update.clear();
6147 compute_snapshot(
6148 this.id,
6149 this.work_directory_abs_path.clone(),
6150 this.snapshot.clone(),
6151 backend.clone(),
6152 )
6153 })
6154 .await?;
6155 this.update(&mut cx, |this, cx| {
6156 this.snapshot = snapshot.clone();
6157 this.clear_pending_ops(cx);
6158 for event in events {
6159 cx.emit(event);
6160 }
6161 });
6162 if let Some(updates_tx) = updates_tx {
6163 updates_tx
6164 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
6165 .ok();
6166 }
6167 Ok(())
6168 },
6169 );
6170 }
6171
6172 fn spawn_local_git_worker(
6173 state: Shared<Task<Result<LocalRepositoryState, String>>>,
6174 cx: &mut Context<Self>,
6175 ) -> mpsc::UnboundedSender<GitJob> {
6176 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
6177
6178 cx.spawn(async move |_, cx| {
6179 let state = state.await.map_err(|err| anyhow::anyhow!(err))?;
6180 if let Some(git_hosting_provider_registry) =
6181 cx.update(|cx| GitHostingProviderRegistry::try_global(cx))
6182 {
6183 git_hosting_providers::register_additional_providers(
6184 git_hosting_provider_registry,
6185 state.backend.clone(),
6186 )
6187 .await;
6188 }
6189 let state = RepositoryState::Local(state);
6190 let mut jobs = VecDeque::new();
6191 loop {
6192 while let Ok(Some(next_job)) = job_rx.try_next() {
6193 jobs.push_back(next_job);
6194 }
6195
6196 if let Some(job) = jobs.pop_front() {
6197 if let Some(current_key) = &job.key
6198 && jobs
6199 .iter()
6200 .any(|other_job| other_job.key.as_ref() == Some(current_key))
6201 {
6202 continue;
6203 }
6204 (job.job)(state.clone(), cx).await;
6205 } else if let Some(job) = job_rx.next().await {
6206 jobs.push_back(job);
6207 } else {
6208 break;
6209 }
6210 }
6211 anyhow::Ok(())
6212 })
6213 .detach_and_log_err(cx);
6214
6215 job_tx
6216 }
6217
6218 fn spawn_remote_git_worker(
6219 state: RemoteRepositoryState,
6220 cx: &mut Context<Self>,
6221 ) -> mpsc::UnboundedSender<GitJob> {
6222 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
6223
6224 cx.spawn(async move |_, cx| {
6225 let state = RepositoryState::Remote(state);
6226 let mut jobs = VecDeque::new();
6227 loop {
6228 while let Ok(Some(next_job)) = job_rx.try_next() {
6229 jobs.push_back(next_job);
6230 }
6231
6232 if let Some(job) = jobs.pop_front() {
6233 if let Some(current_key) = &job.key
6234 && jobs
6235 .iter()
6236 .any(|other_job| other_job.key.as_ref() == Some(current_key))
6237 {
6238 continue;
6239 }
6240 (job.job)(state.clone(), cx).await;
6241 } else if let Some(job) = job_rx.next().await {
6242 jobs.push_back(job);
6243 } else {
6244 break;
6245 }
6246 }
6247 anyhow::Ok(())
6248 })
6249 .detach_and_log_err(cx);
6250
6251 job_tx
6252 }
6253
6254 fn load_staged_text(
6255 &mut self,
6256 buffer_id: BufferId,
6257 repo_path: RepoPath,
6258 cx: &App,
6259 ) -> Task<Result<Option<String>>> {
6260 let rx = self.send_job(None, move |state, _| async move {
6261 match state {
6262 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
6263 anyhow::Ok(backend.load_index_text(repo_path).await)
6264 }
6265 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
6266 let response = client
6267 .request(proto::OpenUnstagedDiff {
6268 project_id: project_id.to_proto(),
6269 buffer_id: buffer_id.to_proto(),
6270 })
6271 .await?;
6272 Ok(response.staged_text)
6273 }
6274 }
6275 });
6276 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
6277 }
6278
6279 fn load_committed_text(
6280 &mut self,
6281 buffer_id: BufferId,
6282 repo_path: RepoPath,
6283 cx: &App,
6284 ) -> Task<Result<DiffBasesChange>> {
6285 let rx = self.send_job(None, move |state, _| async move {
6286 match state {
6287 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
6288 let committed_text = backend.load_committed_text(repo_path.clone()).await;
6289 let staged_text = backend.load_index_text(repo_path).await;
6290 let diff_bases_change = if committed_text == staged_text {
6291 DiffBasesChange::SetBoth(committed_text)
6292 } else {
6293 DiffBasesChange::SetEach {
6294 index: staged_text,
6295 head: committed_text,
6296 }
6297 };
6298 anyhow::Ok(diff_bases_change)
6299 }
6300 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
6301 use proto::open_uncommitted_diff_response::Mode;
6302
6303 let response = client
6304 .request(proto::OpenUncommittedDiff {
6305 project_id: project_id.to_proto(),
6306 buffer_id: buffer_id.to_proto(),
6307 })
6308 .await?;
6309 let mode = Mode::from_i32(response.mode).context("Invalid mode")?;
6310 let bases = match mode {
6311 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
6312 Mode::IndexAndHead => DiffBasesChange::SetEach {
6313 head: response.committed_text,
6314 index: response.staged_text,
6315 },
6316 };
6317 Ok(bases)
6318 }
6319 }
6320 });
6321
6322 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
6323 }
6324
6325 fn load_blob_content(&mut self, oid: Oid, cx: &App) -> Task<Result<String>> {
6326 let repository_id = self.snapshot.id;
6327 let rx = self.send_job(None, move |state, _| async move {
6328 match state {
6329 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
6330 backend.load_blob_content(oid).await
6331 }
6332 RepositoryState::Remote(RemoteRepositoryState { client, project_id }) => {
6333 let response = client
6334 .request(proto::GetBlobContent {
6335 project_id: project_id.to_proto(),
6336 repository_id: repository_id.0,
6337 oid: oid.to_string(),
6338 })
6339 .await?;
6340 Ok(response.content)
6341 }
6342 }
6343 });
6344 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
6345 }
6346
6347 fn paths_changed(
6348 &mut self,
6349 paths: Vec<RepoPath>,
6350 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
6351 cx: &mut Context<Self>,
6352 ) {
6353 if !paths.is_empty() {
6354 self.paths_needing_status_update.push(paths);
6355 }
6356
6357 let this = cx.weak_entity();
6358 let _ = self.send_keyed_job(
6359 Some(GitJobKey::RefreshStatuses),
6360 None,
6361 |state, mut cx| async move {
6362 let (prev_snapshot, changed_paths) = this.update(&mut cx, |this, _| {
6363 (
6364 this.snapshot.clone(),
6365 mem::take(&mut this.paths_needing_status_update),
6366 )
6367 })?;
6368 let RepositoryState::Local(LocalRepositoryState { backend, .. }) = state else {
6369 bail!("not a local repository")
6370 };
6371
6372 if changed_paths.is_empty() {
6373 return Ok(());
6374 }
6375
6376 let stash_entries = backend.stash_entries().await?;
6377 let changed_path_statuses = cx
6378 .background_spawn(async move {
6379 let mut changed_paths =
6380 changed_paths.into_iter().flatten().collect::<BTreeSet<_>>();
6381 let statuses = backend
6382 .status(&changed_paths.iter().cloned().collect::<Vec<_>>())
6383 .await?;
6384 let mut changed_path_statuses = Vec::new();
6385 let prev_statuses = prev_snapshot.statuses_by_path.clone();
6386 let mut cursor = prev_statuses.cursor::<PathProgress>(());
6387
6388 for (repo_path, status) in &*statuses.entries {
6389 changed_paths.remove(repo_path);
6390 if cursor.seek_forward(&PathTarget::Path(repo_path), Bias::Left)
6391 && cursor.item().is_some_and(|entry| entry.status == *status)
6392 {
6393 continue;
6394 }
6395
6396 changed_path_statuses.push(Edit::Insert(StatusEntry {
6397 repo_path: repo_path.clone(),
6398 status: *status,
6399 }));
6400 }
6401 let mut cursor = prev_statuses.cursor::<PathProgress>(());
6402 for path in changed_paths.into_iter() {
6403 if cursor.seek_forward(&PathTarget::Path(&path), Bias::Left) {
6404 changed_path_statuses
6405 .push(Edit::Remove(PathKey(path.as_ref().clone())));
6406 }
6407 }
6408 anyhow::Ok(changed_path_statuses)
6409 })
6410 .await?;
6411
6412 this.update(&mut cx, |this, cx| {
6413 if this.snapshot.stash_entries != stash_entries {
6414 cx.emit(RepositoryEvent::StashEntriesChanged);
6415 this.snapshot.stash_entries = stash_entries;
6416 }
6417
6418 if !changed_path_statuses.is_empty() {
6419 cx.emit(RepositoryEvent::StatusesChanged);
6420 this.snapshot
6421 .statuses_by_path
6422 .edit(changed_path_statuses, ());
6423 this.snapshot.scan_id += 1;
6424 }
6425
6426 if let Some(updates_tx) = updates_tx {
6427 updates_tx
6428 .unbounded_send(DownstreamUpdate::UpdateRepository(
6429 this.snapshot.clone(),
6430 ))
6431 .ok();
6432 }
6433 })
6434 },
6435 );
6436 }
6437
6438 /// currently running git command and when it started
6439 pub fn current_job(&self) -> Option<JobInfo> {
6440 self.active_jobs.values().next().cloned()
6441 }
6442
6443 pub fn barrier(&mut self) -> oneshot::Receiver<()> {
6444 self.send_job(None, |_, _| async {})
6445 }
6446
6447 fn spawn_job_with_tracking<AsyncFn>(
6448 &mut self,
6449 paths: Vec<RepoPath>,
6450 git_status: pending_op::GitStatus,
6451 cx: &mut Context<Self>,
6452 f: AsyncFn,
6453 ) -> Task<Result<()>>
6454 where
6455 AsyncFn: AsyncFnOnce(WeakEntity<Repository>, &mut AsyncApp) -> Result<()> + 'static,
6456 {
6457 let ids = self.new_pending_ops_for_paths(paths, git_status);
6458
6459 cx.spawn(async move |this, cx| {
6460 let (job_status, result) = match f(this.clone(), cx).await {
6461 Ok(()) => (pending_op::JobStatus::Finished, Ok(())),
6462 Err(err) if err.is::<Canceled>() => (pending_op::JobStatus::Skipped, Ok(())),
6463 Err(err) => (pending_op::JobStatus::Error, Err(err)),
6464 };
6465
6466 this.update(cx, |this, _| {
6467 let mut edits = Vec::with_capacity(ids.len());
6468 for (id, entry) in ids {
6469 if let Some(mut ops) = this
6470 .pending_ops
6471 .get(&PathKey(entry.as_ref().clone()), ())
6472 .cloned()
6473 {
6474 if let Some(op) = ops.op_by_id_mut(id) {
6475 op.job_status = job_status;
6476 }
6477 edits.push(sum_tree::Edit::Insert(ops));
6478 }
6479 }
6480 this.pending_ops.edit(edits, ());
6481 })?;
6482
6483 result
6484 })
6485 }
6486
6487 fn new_pending_ops_for_paths(
6488 &mut self,
6489 paths: Vec<RepoPath>,
6490 git_status: pending_op::GitStatus,
6491 ) -> Vec<(PendingOpId, RepoPath)> {
6492 let mut edits = Vec::with_capacity(paths.len());
6493 let mut ids = Vec::with_capacity(paths.len());
6494 for path in paths {
6495 let mut ops = self
6496 .pending_ops
6497 .get(&PathKey(path.as_ref().clone()), ())
6498 .cloned()
6499 .unwrap_or_else(|| PendingOps::new(&path));
6500 let id = ops.max_id() + 1;
6501 ops.ops.push(PendingOp {
6502 id,
6503 git_status,
6504 job_status: pending_op::JobStatus::Running,
6505 });
6506 edits.push(sum_tree::Edit::Insert(ops));
6507 ids.push((id, path));
6508 }
6509 self.pending_ops.edit(edits, ());
6510 ids
6511 }
6512 pub fn default_remote_url(&self) -> Option<String> {
6513 self.remote_upstream_url
6514 .clone()
6515 .or(self.remote_origin_url.clone())
6516 }
6517}
6518
6519fn get_permalink_in_rust_registry_src(
6520 provider_registry: Arc<GitHostingProviderRegistry>,
6521 path: PathBuf,
6522 selection: Range<u32>,
6523) -> Result<url::Url> {
6524 #[derive(Deserialize)]
6525 struct CargoVcsGit {
6526 sha1: String,
6527 }
6528
6529 #[derive(Deserialize)]
6530 struct CargoVcsInfo {
6531 git: CargoVcsGit,
6532 path_in_vcs: String,
6533 }
6534
6535 #[derive(Deserialize)]
6536 struct CargoPackage {
6537 repository: String,
6538 }
6539
6540 #[derive(Deserialize)]
6541 struct CargoToml {
6542 package: CargoPackage,
6543 }
6544
6545 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
6546 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
6547 Some((dir, json))
6548 }) else {
6549 bail!("No .cargo_vcs_info.json found in parent directories")
6550 };
6551 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
6552 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
6553 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
6554 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
6555 .context("parsing package.repository field of manifest")?;
6556 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
6557 let permalink = provider.build_permalink(
6558 remote,
6559 BuildPermalinkParams::new(
6560 &cargo_vcs_info.git.sha1,
6561 &RepoPath::from_rel_path(
6562 &RelPath::new(&path, PathStyle::local()).context("invalid path")?,
6563 ),
6564 Some(selection),
6565 ),
6566 );
6567 Ok(permalink)
6568}
6569
6570fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
6571 let Some(blame) = blame else {
6572 return proto::BlameBufferResponse {
6573 blame_response: None,
6574 };
6575 };
6576
6577 let entries = blame
6578 .entries
6579 .into_iter()
6580 .map(|entry| proto::BlameEntry {
6581 sha: entry.sha.as_bytes().into(),
6582 start_line: entry.range.start,
6583 end_line: entry.range.end,
6584 original_line_number: entry.original_line_number,
6585 author: entry.author,
6586 author_mail: entry.author_mail,
6587 author_time: entry.author_time,
6588 author_tz: entry.author_tz,
6589 committer: entry.committer_name,
6590 committer_mail: entry.committer_email,
6591 committer_time: entry.committer_time,
6592 committer_tz: entry.committer_tz,
6593 summary: entry.summary,
6594 previous: entry.previous,
6595 filename: entry.filename,
6596 })
6597 .collect::<Vec<_>>();
6598
6599 let messages = blame
6600 .messages
6601 .into_iter()
6602 .map(|(oid, message)| proto::CommitMessage {
6603 oid: oid.as_bytes().into(),
6604 message,
6605 })
6606 .collect::<Vec<_>>();
6607
6608 proto::BlameBufferResponse {
6609 blame_response: Some(proto::blame_buffer_response::BlameResponse { entries, messages }),
6610 }
6611}
6612
6613fn deserialize_blame_buffer_response(
6614 response: proto::BlameBufferResponse,
6615) -> Option<git::blame::Blame> {
6616 let response = response.blame_response?;
6617 let entries = response
6618 .entries
6619 .into_iter()
6620 .filter_map(|entry| {
6621 Some(git::blame::BlameEntry {
6622 sha: git::Oid::from_bytes(&entry.sha).ok()?,
6623 range: entry.start_line..entry.end_line,
6624 original_line_number: entry.original_line_number,
6625 committer_name: entry.committer,
6626 committer_time: entry.committer_time,
6627 committer_tz: entry.committer_tz,
6628 committer_email: entry.committer_mail,
6629 author: entry.author,
6630 author_mail: entry.author_mail,
6631 author_time: entry.author_time,
6632 author_tz: entry.author_tz,
6633 summary: entry.summary,
6634 previous: entry.previous,
6635 filename: entry.filename,
6636 })
6637 })
6638 .collect::<Vec<_>>();
6639
6640 let messages = response
6641 .messages
6642 .into_iter()
6643 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
6644 .collect::<HashMap<_, _>>();
6645
6646 Some(Blame { entries, messages })
6647}
6648
6649fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
6650 proto::Branch {
6651 is_head: branch.is_head,
6652 ref_name: branch.ref_name.to_string(),
6653 unix_timestamp: branch
6654 .most_recent_commit
6655 .as_ref()
6656 .map(|commit| commit.commit_timestamp as u64),
6657 upstream: branch.upstream.as_ref().map(|upstream| proto::GitUpstream {
6658 ref_name: upstream.ref_name.to_string(),
6659 tracking: upstream
6660 .tracking
6661 .status()
6662 .map(|upstream| proto::UpstreamTracking {
6663 ahead: upstream.ahead as u64,
6664 behind: upstream.behind as u64,
6665 }),
6666 }),
6667 most_recent_commit: branch
6668 .most_recent_commit
6669 .as_ref()
6670 .map(|commit| proto::CommitSummary {
6671 sha: commit.sha.to_string(),
6672 subject: commit.subject.to_string(),
6673 commit_timestamp: commit.commit_timestamp,
6674 author_name: commit.author_name.to_string(),
6675 }),
6676 }
6677}
6678
6679fn worktree_to_proto(worktree: &git::repository::Worktree) -> proto::Worktree {
6680 proto::Worktree {
6681 path: worktree.path.to_string_lossy().to_string(),
6682 ref_name: worktree.ref_name.to_string(),
6683 sha: worktree.sha.to_string(),
6684 }
6685}
6686
6687fn proto_to_worktree(proto: &proto::Worktree) -> git::repository::Worktree {
6688 git::repository::Worktree {
6689 path: PathBuf::from(proto.path.clone()),
6690 ref_name: proto.ref_name.clone().into(),
6691 sha: proto.sha.clone().into(),
6692 }
6693}
6694
6695fn proto_to_branch(proto: &proto::Branch) -> git::repository::Branch {
6696 git::repository::Branch {
6697 is_head: proto.is_head,
6698 ref_name: proto.ref_name.clone().into(),
6699 upstream: proto
6700 .upstream
6701 .as_ref()
6702 .map(|upstream| git::repository::Upstream {
6703 ref_name: upstream.ref_name.to_string().into(),
6704 tracking: upstream
6705 .tracking
6706 .as_ref()
6707 .map(|tracking| {
6708 git::repository::UpstreamTracking::Tracked(UpstreamTrackingStatus {
6709 ahead: tracking.ahead as u32,
6710 behind: tracking.behind as u32,
6711 })
6712 })
6713 .unwrap_or(git::repository::UpstreamTracking::Gone),
6714 }),
6715 most_recent_commit: proto.most_recent_commit.as_ref().map(|commit| {
6716 git::repository::CommitSummary {
6717 sha: commit.sha.to_string().into(),
6718 subject: commit.subject.to_string().into(),
6719 commit_timestamp: commit.commit_timestamp,
6720 author_name: commit.author_name.to_string().into(),
6721 has_parent: true,
6722 }
6723 }),
6724 }
6725}
6726
6727fn commit_details_to_proto(commit: &CommitDetails) -> proto::GitCommitDetails {
6728 proto::GitCommitDetails {
6729 sha: commit.sha.to_string(),
6730 message: commit.message.to_string(),
6731 commit_timestamp: commit.commit_timestamp,
6732 author_email: commit.author_email.to_string(),
6733 author_name: commit.author_name.to_string(),
6734 }
6735}
6736
6737fn proto_to_commit_details(proto: &proto::GitCommitDetails) -> CommitDetails {
6738 CommitDetails {
6739 sha: proto.sha.clone().into(),
6740 message: proto.message.clone().into(),
6741 commit_timestamp: proto.commit_timestamp,
6742 author_email: proto.author_email.clone().into(),
6743 author_name: proto.author_name.clone().into(),
6744 }
6745}
6746
6747async fn compute_snapshot(
6748 id: RepositoryId,
6749 work_directory_abs_path: Arc<Path>,
6750 prev_snapshot: RepositorySnapshot,
6751 backend: Arc<dyn GitRepository>,
6752) -> Result<(RepositorySnapshot, Vec<RepositoryEvent>)> {
6753 let mut events = Vec::new();
6754 let branches = backend.branches().await?;
6755 let branch = branches.into_iter().find(|branch| branch.is_head);
6756 let statuses = backend
6757 .status(&[RepoPath::from_rel_path(
6758 &RelPath::new(".".as_ref(), PathStyle::local()).unwrap(),
6759 )])
6760 .await?;
6761 let stash_entries = backend.stash_entries().await?;
6762 let statuses_by_path = SumTree::from_iter(
6763 statuses
6764 .entries
6765 .iter()
6766 .map(|(repo_path, status)| StatusEntry {
6767 repo_path: repo_path.clone(),
6768 status: *status,
6769 }),
6770 (),
6771 );
6772 let (merge_details, merge_heads_changed) =
6773 MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?;
6774 log::debug!("new merge details (changed={merge_heads_changed:?}): {merge_details:?}");
6775
6776 if merge_heads_changed {
6777 events.push(RepositoryEvent::MergeHeadsChanged);
6778 }
6779
6780 if statuses_by_path != prev_snapshot.statuses_by_path {
6781 events.push(RepositoryEvent::StatusesChanged)
6782 }
6783
6784 // Useful when branch is None in detached head state
6785 let head_commit = match backend.head_sha().await {
6786 Some(head_sha) => backend.show(head_sha).await.log_err(),
6787 None => None,
6788 };
6789
6790 if branch != prev_snapshot.branch || head_commit != prev_snapshot.head_commit {
6791 events.push(RepositoryEvent::BranchChanged);
6792 }
6793
6794 let remote_origin_url = backend.remote_url("origin").await;
6795 let remote_upstream_url = backend.remote_url("upstream").await;
6796
6797 let snapshot = RepositorySnapshot {
6798 id,
6799 statuses_by_path,
6800 work_directory_abs_path,
6801 path_style: prev_snapshot.path_style,
6802 scan_id: prev_snapshot.scan_id + 1,
6803 branch,
6804 head_commit,
6805 merge: merge_details,
6806 remote_origin_url,
6807 remote_upstream_url,
6808 stash_entries,
6809 };
6810
6811 Ok((snapshot, events))
6812}
6813
6814fn status_from_proto(
6815 simple_status: i32,
6816 status: Option<proto::GitFileStatus>,
6817) -> anyhow::Result<FileStatus> {
6818 use proto::git_file_status::Variant;
6819
6820 let Some(variant) = status.and_then(|status| status.variant) else {
6821 let code = proto::GitStatus::from_i32(simple_status)
6822 .with_context(|| format!("Invalid git status code: {simple_status}"))?;
6823 let result = match code {
6824 proto::GitStatus::Added => TrackedStatus {
6825 worktree_status: StatusCode::Added,
6826 index_status: StatusCode::Unmodified,
6827 }
6828 .into(),
6829 proto::GitStatus::Modified => TrackedStatus {
6830 worktree_status: StatusCode::Modified,
6831 index_status: StatusCode::Unmodified,
6832 }
6833 .into(),
6834 proto::GitStatus::Conflict => UnmergedStatus {
6835 first_head: UnmergedStatusCode::Updated,
6836 second_head: UnmergedStatusCode::Updated,
6837 }
6838 .into(),
6839 proto::GitStatus::Deleted => TrackedStatus {
6840 worktree_status: StatusCode::Deleted,
6841 index_status: StatusCode::Unmodified,
6842 }
6843 .into(),
6844 _ => anyhow::bail!("Invalid code for simple status: {simple_status}"),
6845 };
6846 return Ok(result);
6847 };
6848
6849 let result = match variant {
6850 Variant::Untracked(_) => FileStatus::Untracked,
6851 Variant::Ignored(_) => FileStatus::Ignored,
6852 Variant::Unmerged(unmerged) => {
6853 let [first_head, second_head] =
6854 [unmerged.first_head, unmerged.second_head].map(|head| {
6855 let code = proto::GitStatus::from_i32(head)
6856 .with_context(|| format!("Invalid git status code: {head}"))?;
6857 let result = match code {
6858 proto::GitStatus::Added => UnmergedStatusCode::Added,
6859 proto::GitStatus::Updated => UnmergedStatusCode::Updated,
6860 proto::GitStatus::Deleted => UnmergedStatusCode::Deleted,
6861 _ => anyhow::bail!("Invalid code for unmerged status: {code:?}"),
6862 };
6863 Ok(result)
6864 });
6865 let [first_head, second_head] = [first_head?, second_head?];
6866 UnmergedStatus {
6867 first_head,
6868 second_head,
6869 }
6870 .into()
6871 }
6872 Variant::Tracked(tracked) => {
6873 let [index_status, worktree_status] = [tracked.index_status, tracked.worktree_status]
6874 .map(|status| {
6875 let code = proto::GitStatus::from_i32(status)
6876 .with_context(|| format!("Invalid git status code: {status}"))?;
6877 let result = match code {
6878 proto::GitStatus::Modified => StatusCode::Modified,
6879 proto::GitStatus::TypeChanged => StatusCode::TypeChanged,
6880 proto::GitStatus::Added => StatusCode::Added,
6881 proto::GitStatus::Deleted => StatusCode::Deleted,
6882 proto::GitStatus::Renamed => StatusCode::Renamed,
6883 proto::GitStatus::Copied => StatusCode::Copied,
6884 proto::GitStatus::Unmodified => StatusCode::Unmodified,
6885 _ => anyhow::bail!("Invalid code for tracked status: {code:?}"),
6886 };
6887 Ok(result)
6888 });
6889 let [index_status, worktree_status] = [index_status?, worktree_status?];
6890 TrackedStatus {
6891 index_status,
6892 worktree_status,
6893 }
6894 .into()
6895 }
6896 };
6897 Ok(result)
6898}
6899
6900fn status_to_proto(status: FileStatus) -> proto::GitFileStatus {
6901 use proto::git_file_status::{Tracked, Unmerged, Variant};
6902
6903 let variant = match status {
6904 FileStatus::Untracked => Variant::Untracked(Default::default()),
6905 FileStatus::Ignored => Variant::Ignored(Default::default()),
6906 FileStatus::Unmerged(UnmergedStatus {
6907 first_head,
6908 second_head,
6909 }) => Variant::Unmerged(Unmerged {
6910 first_head: unmerged_status_to_proto(first_head),
6911 second_head: unmerged_status_to_proto(second_head),
6912 }),
6913 FileStatus::Tracked(TrackedStatus {
6914 index_status,
6915 worktree_status,
6916 }) => Variant::Tracked(Tracked {
6917 index_status: tracked_status_to_proto(index_status),
6918 worktree_status: tracked_status_to_proto(worktree_status),
6919 }),
6920 };
6921 proto::GitFileStatus {
6922 variant: Some(variant),
6923 }
6924}
6925
6926fn unmerged_status_to_proto(code: UnmergedStatusCode) -> i32 {
6927 match code {
6928 UnmergedStatusCode::Added => proto::GitStatus::Added as _,
6929 UnmergedStatusCode::Deleted => proto::GitStatus::Deleted as _,
6930 UnmergedStatusCode::Updated => proto::GitStatus::Updated as _,
6931 }
6932}
6933
6934fn tracked_status_to_proto(code: StatusCode) -> i32 {
6935 match code {
6936 StatusCode::Added => proto::GitStatus::Added as _,
6937 StatusCode::Deleted => proto::GitStatus::Deleted as _,
6938 StatusCode::Modified => proto::GitStatus::Modified as _,
6939 StatusCode::Renamed => proto::GitStatus::Renamed as _,
6940 StatusCode::TypeChanged => proto::GitStatus::TypeChanged as _,
6941 StatusCode::Copied => proto::GitStatus::Copied as _,
6942 StatusCode::Unmodified => proto::GitStatus::Unmodified as _,
6943 }
6944}