1pub mod branch_diff;
2mod conflict_set;
3pub mod git_traversal;
4pub mod pending_op;
5
6use crate::{
7 ProjectEnvironment, ProjectItem, ProjectPath,
8 buffer_store::{BufferStore, BufferStoreEvent},
9 worktree_store::{WorktreeStore, WorktreeStoreEvent},
10};
11use anyhow::{Context as _, Result, anyhow, bail};
12use askpass::{AskPassDelegate, EncryptedPassword, IKnowWhatIAmDoingAndIHaveReadTheDocs};
13use buffer_diff::{BufferDiff, BufferDiffEvent};
14use client::ProjectId;
15use collections::HashMap;
16pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, ConflictSetUpdate};
17use fs::Fs;
18use futures::{
19 FutureExt, StreamExt,
20 channel::{
21 mpsc,
22 oneshot::{self, Canceled},
23 },
24 future::{self, Shared},
25 stream::FuturesOrdered,
26};
27use git::{
28 BuildPermalinkParams, GitHostingProviderRegistry, Oid, RunHook,
29 blame::Blame,
30 parse_git_remote_url,
31 repository::{
32 Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, DiffType, FetchOptions,
33 GitRepository, GitRepositoryCheckpoint, GraphCommitData, InitialGraphCommitData, LogOrder,
34 LogSource, PushOptions, Remote, RemoteCommandOutput, RepoPath, ResetMode,
35 UpstreamTrackingStatus, Worktree as GitWorktree,
36 },
37 stash::{GitStash, StashEntry},
38 status::{
39 DiffTreeType, FileStatus, GitSummary, StatusCode, TrackedStatus, TreeDiff, TreeDiffStatus,
40 UnmergedStatus, UnmergedStatusCode,
41 },
42};
43use gpui::{
44 App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
45 WeakEntity,
46};
47use language::{
48 Buffer, BufferEvent, Language, LanguageRegistry,
49 proto::{deserialize_version, serialize_version},
50};
51use parking_lot::Mutex;
52use pending_op::{PendingOp, PendingOpId, PendingOps, PendingOpsSummary};
53use postage::stream::Stream as _;
54use rpc::{
55 AnyProtoClient, TypedEnvelope,
56 proto::{self, git_reset, split_repository_update},
57};
58use serde::Deserialize;
59use settings::WorktreeId;
60use smol::future::yield_now;
61use std::{
62 cmp::Ordering,
63 collections::{BTreeSet, HashSet, VecDeque},
64 future::Future,
65 mem,
66 ops::Range,
67 path::{Path, PathBuf},
68 str::FromStr,
69 sync::{
70 Arc,
71 atomic::{self, AtomicU64},
72 },
73 time::Instant,
74};
75use sum_tree::{Edit, SumTree, TreeSet};
76use task::Shell;
77use text::{Bias, BufferId};
78use util::{
79 ResultExt, debug_panic,
80 paths::{PathStyle, SanitizedPath},
81 post_inc,
82 rel_path::RelPath,
83};
84use worktree::{
85 File, PathChange, PathKey, PathProgress, PathSummary, PathTarget, ProjectEntryId,
86 UpdatedGitRepositoriesSet, UpdatedGitRepository, Worktree,
87};
88use zeroize::Zeroize;
89
90pub struct GitStore {
91 state: GitStoreState,
92 buffer_store: Entity<BufferStore>,
93 worktree_store: Entity<WorktreeStore>,
94 repositories: HashMap<RepositoryId, Entity<Repository>>,
95 worktree_ids: HashMap<RepositoryId, HashSet<WorktreeId>>,
96 active_repo_id: Option<RepositoryId>,
97 #[allow(clippy::type_complexity)]
98 loading_diffs:
99 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
100 diffs: HashMap<BufferId, Entity<BufferGitState>>,
101 shared_diffs: HashMap<proto::PeerId, HashMap<BufferId, SharedDiffs>>,
102 _subscriptions: Vec<Subscription>,
103}
104
105#[derive(Default)]
106struct SharedDiffs {
107 unstaged: Option<Entity<BufferDiff>>,
108 uncommitted: Option<Entity<BufferDiff>>,
109}
110
111struct BufferGitState {
112 unstaged_diff: Option<WeakEntity<BufferDiff>>,
113 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
114 oid_diffs: HashMap<Option<git::Oid>, WeakEntity<BufferDiff>>,
115 conflict_set: Option<WeakEntity<ConflictSet>>,
116 recalculate_diff_task: Option<Task<Result<()>>>,
117 reparse_conflict_markers_task: Option<Task<Result<()>>>,
118 language: Option<Arc<Language>>,
119 language_registry: Option<Arc<LanguageRegistry>>,
120 conflict_updated_futures: Vec<oneshot::Sender<()>>,
121 recalculating_tx: postage::watch::Sender<bool>,
122
123 /// These operation counts are used to ensure that head and index text
124 /// values read from the git repository are up-to-date with any hunk staging
125 /// operations that have been performed on the BufferDiff.
126 ///
127 /// The operation count is incremented immediately when the user initiates a
128 /// hunk stage/unstage operation. Then, upon finishing writing the new index
129 /// text do disk, the `operation count as of write` is updated to reflect
130 /// the operation count that prompted the write.
131 hunk_staging_operation_count: usize,
132 hunk_staging_operation_count_as_of_write: usize,
133
134 head_text: Option<Arc<str>>,
135 index_text: Option<Arc<str>>,
136 oid_texts: HashMap<git::Oid, Arc<str>>,
137 head_changed: bool,
138 index_changed: bool,
139 language_changed: bool,
140}
141
142#[derive(Clone, Debug)]
143enum DiffBasesChange {
144 SetIndex(Option<String>),
145 SetHead(Option<String>),
146 SetEach {
147 index: Option<String>,
148 head: Option<String>,
149 },
150 SetBoth(Option<String>),
151}
152
153#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
154enum DiffKind {
155 Unstaged,
156 Uncommitted,
157 SinceOid(Option<git::Oid>),
158}
159
160enum GitStoreState {
161 Local {
162 next_repository_id: Arc<AtomicU64>,
163 downstream: Option<LocalDownstreamState>,
164 project_environment: Entity<ProjectEnvironment>,
165 fs: Arc<dyn Fs>,
166 },
167 Remote {
168 upstream_client: AnyProtoClient,
169 upstream_project_id: u64,
170 downstream: Option<(AnyProtoClient, ProjectId)>,
171 },
172}
173
174enum DownstreamUpdate {
175 UpdateRepository(RepositorySnapshot),
176 RemoveRepository(RepositoryId),
177}
178
179struct LocalDownstreamState {
180 client: AnyProtoClient,
181 project_id: ProjectId,
182 updates_tx: mpsc::UnboundedSender<DownstreamUpdate>,
183 _task: Task<Result<()>>,
184}
185
186#[derive(Clone, Debug)]
187pub struct GitStoreCheckpoint {
188 checkpoints_by_work_dir_abs_path: HashMap<Arc<Path>, GitRepositoryCheckpoint>,
189}
190
191#[derive(Clone, Debug, PartialEq, Eq)]
192pub struct StatusEntry {
193 pub repo_path: RepoPath,
194 pub status: FileStatus,
195}
196
197impl StatusEntry {
198 fn to_proto(&self) -> proto::StatusEntry {
199 let simple_status = match self.status {
200 FileStatus::Ignored | FileStatus::Untracked => proto::GitStatus::Added as i32,
201 FileStatus::Unmerged { .. } => proto::GitStatus::Conflict as i32,
202 FileStatus::Tracked(TrackedStatus {
203 index_status,
204 worktree_status,
205 }) => tracked_status_to_proto(if worktree_status != StatusCode::Unmodified {
206 worktree_status
207 } else {
208 index_status
209 }),
210 };
211
212 proto::StatusEntry {
213 repo_path: self.repo_path.to_proto(),
214 simple_status,
215 status: Some(status_to_proto(self.status)),
216 }
217 }
218}
219
220impl TryFrom<proto::StatusEntry> for StatusEntry {
221 type Error = anyhow::Error;
222
223 fn try_from(value: proto::StatusEntry) -> Result<Self, Self::Error> {
224 let repo_path = RepoPath::from_proto(&value.repo_path).context("invalid repo path")?;
225 let status = status_from_proto(value.simple_status, value.status)?;
226 Ok(Self { repo_path, status })
227 }
228}
229
230impl sum_tree::Item for StatusEntry {
231 type Summary = PathSummary<GitSummary>;
232
233 fn summary(&self, _: <Self::Summary as sum_tree::Summary>::Context<'_>) -> Self::Summary {
234 PathSummary {
235 max_path: self.repo_path.as_ref().clone(),
236 item_summary: self.status.summary(),
237 }
238 }
239}
240
241impl sum_tree::KeyedItem for StatusEntry {
242 type Key = PathKey;
243
244 fn key(&self) -> Self::Key {
245 PathKey(self.repo_path.as_ref().clone())
246 }
247}
248
249#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
250pub struct RepositoryId(pub u64);
251
252#[derive(Clone, Debug, Default, PartialEq, Eq)]
253pub struct MergeDetails {
254 pub conflicted_paths: TreeSet<RepoPath>,
255 pub message: Option<SharedString>,
256 pub heads: Vec<Option<SharedString>>,
257}
258
259#[derive(Clone)]
260pub enum CommitDataState {
261 Loading,
262 Loaded(Arc<GraphCommitData>),
263}
264
265#[derive(Clone, Debug, PartialEq, Eq)]
266pub struct RepositorySnapshot {
267 pub id: RepositoryId,
268 pub statuses_by_path: SumTree<StatusEntry>,
269 pub work_directory_abs_path: Arc<Path>,
270 pub path_style: PathStyle,
271 pub branch: Option<Branch>,
272 pub head_commit: Option<CommitDetails>,
273 pub scan_id: u64,
274 pub merge: MergeDetails,
275 pub remote_origin_url: Option<String>,
276 pub remote_upstream_url: Option<String>,
277 pub stash_entries: GitStash,
278}
279
280type JobId = u64;
281
282#[derive(Clone, Debug, PartialEq, Eq)]
283pub struct JobInfo {
284 pub start: Instant,
285 pub message: SharedString,
286}
287
288struct GraphCommitDataHandler {
289 _task: Task<()>,
290 commit_data_request: smol::channel::Sender<Oid>,
291}
292
293enum GraphCommitHandlerState {
294 Starting,
295 Open(GraphCommitDataHandler),
296 Closed,
297}
298
299pub struct Repository {
300 this: WeakEntity<Self>,
301 snapshot: RepositorySnapshot,
302 commit_message_buffer: Option<Entity<Buffer>>,
303 git_store: WeakEntity<GitStore>,
304 // For a local repository, holds paths that have had worktree events since the last status scan completed,
305 // and that should be examined during the next status scan.
306 paths_needing_status_update: Vec<Vec<RepoPath>>,
307 job_sender: mpsc::UnboundedSender<GitJob>,
308 active_jobs: HashMap<JobId, JobInfo>,
309 pending_ops: SumTree<PendingOps>,
310 job_id: JobId,
311 askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
312 latest_askpass_id: u64,
313 repository_state: Shared<Task<Result<RepositoryState, String>>>,
314 pub initial_graph_data: HashMap<
315 (LogOrder, LogSource),
316 (
317 Task<Result<(), SharedString>>,
318 Vec<Arc<InitialGraphCommitData>>,
319 ),
320 >,
321 graph_commit_data_handler: GraphCommitHandlerState,
322 commit_data: HashMap<Oid, CommitDataState>,
323}
324
325impl std::ops::Deref for Repository {
326 type Target = RepositorySnapshot;
327
328 fn deref(&self) -> &Self::Target {
329 &self.snapshot
330 }
331}
332
333#[derive(Clone)]
334pub struct LocalRepositoryState {
335 pub fs: Arc<dyn Fs>,
336 pub backend: Arc<dyn GitRepository>,
337 pub environment: Arc<HashMap<String, String>>,
338}
339
340impl LocalRepositoryState {
341 async fn new(
342 work_directory_abs_path: Arc<Path>,
343 dot_git_abs_path: Arc<Path>,
344 project_environment: WeakEntity<ProjectEnvironment>,
345 fs: Arc<dyn Fs>,
346 cx: &mut AsyncApp,
347 ) -> anyhow::Result<Self> {
348 let environment = project_environment
349 .update(cx, |project_environment, cx| {
350 project_environment.local_directory_environment(&Shell::System, work_directory_abs_path.clone(), cx)
351 })?
352 .await
353 .unwrap_or_else(|| {
354 log::error!("failed to get working directory environment for repository {work_directory_abs_path:?}");
355 HashMap::default()
356 });
357 let search_paths = environment.get("PATH").map(|val| val.to_owned());
358 let backend = cx
359 .background_spawn({
360 let fs = fs.clone();
361 async move {
362 let system_git_binary_path = search_paths
363 .and_then(|search_paths| {
364 which::which_in("git", Some(search_paths), &work_directory_abs_path)
365 .ok()
366 })
367 .or_else(|| which::which("git").ok());
368 fs.open_repo(&dot_git_abs_path, system_git_binary_path.as_deref())
369 .with_context(|| format!("opening repository at {dot_git_abs_path:?}"))
370 }
371 })
372 .await?;
373 Ok(LocalRepositoryState {
374 backend,
375 environment: Arc::new(environment),
376 fs,
377 })
378 }
379}
380
381#[derive(Clone)]
382pub struct RemoteRepositoryState {
383 pub project_id: ProjectId,
384 pub client: AnyProtoClient,
385}
386
387#[derive(Clone)]
388pub enum RepositoryState {
389 Local(LocalRepositoryState),
390 Remote(RemoteRepositoryState),
391}
392
393#[derive(Clone, Debug, PartialEq, Eq)]
394pub enum RepositoryEvent {
395 StatusesChanged,
396 MergeHeadsChanged,
397 BranchChanged,
398 StashEntriesChanged,
399 PendingOpsChanged { pending_ops: SumTree<PendingOps> },
400 GitGraphCountUpdated((LogOrder, LogSource), usize),
401}
402
403#[derive(Clone, Debug)]
404pub struct JobsUpdated;
405
406#[derive(Debug)]
407pub enum GitStoreEvent {
408 ActiveRepositoryChanged(Option<RepositoryId>),
409 /// Bool is true when the repository that's updated is the active repository
410 RepositoryUpdated(RepositoryId, RepositoryEvent, bool),
411 RepositoryAdded,
412 RepositoryRemoved(RepositoryId),
413 IndexWriteError(anyhow::Error),
414 JobsUpdated,
415 ConflictsUpdated,
416}
417
418impl EventEmitter<RepositoryEvent> for Repository {}
419impl EventEmitter<JobsUpdated> for Repository {}
420impl EventEmitter<GitStoreEvent> for GitStore {}
421
422pub struct GitJob {
423 job: Box<dyn FnOnce(RepositoryState, &mut AsyncApp) -> Task<()>>,
424 key: Option<GitJobKey>,
425}
426
427#[derive(PartialEq, Eq)]
428enum GitJobKey {
429 WriteIndex(Vec<RepoPath>),
430 ReloadBufferDiffBases,
431 RefreshStatuses,
432 ReloadGitState,
433}
434
435impl GitStore {
436 pub fn local(
437 worktree_store: &Entity<WorktreeStore>,
438 buffer_store: Entity<BufferStore>,
439 environment: Entity<ProjectEnvironment>,
440 fs: Arc<dyn Fs>,
441 cx: &mut Context<Self>,
442 ) -> Self {
443 Self::new(
444 worktree_store.clone(),
445 buffer_store,
446 GitStoreState::Local {
447 next_repository_id: Arc::new(AtomicU64::new(1)),
448 downstream: None,
449 project_environment: environment,
450 fs,
451 },
452 cx,
453 )
454 }
455
456 pub fn remote(
457 worktree_store: &Entity<WorktreeStore>,
458 buffer_store: Entity<BufferStore>,
459 upstream_client: AnyProtoClient,
460 project_id: u64,
461 cx: &mut Context<Self>,
462 ) -> Self {
463 Self::new(
464 worktree_store.clone(),
465 buffer_store,
466 GitStoreState::Remote {
467 upstream_client,
468 upstream_project_id: project_id,
469 downstream: None,
470 },
471 cx,
472 )
473 }
474
475 fn new(
476 worktree_store: Entity<WorktreeStore>,
477 buffer_store: Entity<BufferStore>,
478 state: GitStoreState,
479 cx: &mut Context<Self>,
480 ) -> Self {
481 let _subscriptions = vec![
482 cx.subscribe(&worktree_store, Self::on_worktree_store_event),
483 cx.subscribe(&buffer_store, Self::on_buffer_store_event),
484 ];
485
486 GitStore {
487 state,
488 buffer_store,
489 worktree_store,
490 repositories: HashMap::default(),
491 worktree_ids: HashMap::default(),
492 active_repo_id: None,
493 _subscriptions,
494 loading_diffs: HashMap::default(),
495 shared_diffs: HashMap::default(),
496 diffs: HashMap::default(),
497 }
498 }
499
500 pub fn init(client: &AnyProtoClient) {
501 client.add_entity_request_handler(Self::handle_get_remotes);
502 client.add_entity_request_handler(Self::handle_get_branches);
503 client.add_entity_request_handler(Self::handle_get_default_branch);
504 client.add_entity_request_handler(Self::handle_change_branch);
505 client.add_entity_request_handler(Self::handle_create_branch);
506 client.add_entity_request_handler(Self::handle_rename_branch);
507 client.add_entity_request_handler(Self::handle_create_remote);
508 client.add_entity_request_handler(Self::handle_remove_remote);
509 client.add_entity_request_handler(Self::handle_delete_branch);
510 client.add_entity_request_handler(Self::handle_git_init);
511 client.add_entity_request_handler(Self::handle_push);
512 client.add_entity_request_handler(Self::handle_pull);
513 client.add_entity_request_handler(Self::handle_fetch);
514 client.add_entity_request_handler(Self::handle_stage);
515 client.add_entity_request_handler(Self::handle_unstage);
516 client.add_entity_request_handler(Self::handle_stash);
517 client.add_entity_request_handler(Self::handle_stash_pop);
518 client.add_entity_request_handler(Self::handle_stash_apply);
519 client.add_entity_request_handler(Self::handle_stash_drop);
520 client.add_entity_request_handler(Self::handle_commit);
521 client.add_entity_request_handler(Self::handle_run_hook);
522 client.add_entity_request_handler(Self::handle_reset);
523 client.add_entity_request_handler(Self::handle_show);
524 client.add_entity_request_handler(Self::handle_load_commit_diff);
525 client.add_entity_request_handler(Self::handle_file_history);
526 client.add_entity_request_handler(Self::handle_checkout_files);
527 client.add_entity_request_handler(Self::handle_open_commit_message_buffer);
528 client.add_entity_request_handler(Self::handle_set_index_text);
529 client.add_entity_request_handler(Self::handle_askpass);
530 client.add_entity_request_handler(Self::handle_check_for_pushed_commits);
531 client.add_entity_request_handler(Self::handle_git_diff);
532 client.add_entity_request_handler(Self::handle_tree_diff);
533 client.add_entity_request_handler(Self::handle_get_blob_content);
534 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
535 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
536 client.add_entity_message_handler(Self::handle_update_diff_bases);
537 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
538 client.add_entity_request_handler(Self::handle_blame_buffer);
539 client.add_entity_message_handler(Self::handle_update_repository);
540 client.add_entity_message_handler(Self::handle_remove_repository);
541 client.add_entity_request_handler(Self::handle_git_clone);
542 client.add_entity_request_handler(Self::handle_get_worktrees);
543 client.add_entity_request_handler(Self::handle_create_worktree);
544 }
545
546 pub fn is_local(&self) -> bool {
547 matches!(self.state, GitStoreState::Local { .. })
548 }
549 pub fn set_active_repo_for_path(&mut self, project_path: &ProjectPath, cx: &mut Context<Self>) {
550 if let Some((repo, _)) = self.repository_and_path_for_project_path(project_path, cx) {
551 let id = repo.read(cx).id;
552 if self.active_repo_id != Some(id) {
553 self.active_repo_id = Some(id);
554 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
555 }
556 }
557 }
558
559 pub fn shared(&mut self, project_id: u64, client: AnyProtoClient, cx: &mut Context<Self>) {
560 match &mut self.state {
561 GitStoreState::Remote {
562 downstream: downstream_client,
563 ..
564 } => {
565 for repo in self.repositories.values() {
566 let update = repo.read(cx).snapshot.initial_update(project_id);
567 for update in split_repository_update(update) {
568 client.send(update).log_err();
569 }
570 }
571 *downstream_client = Some((client, ProjectId(project_id)));
572 }
573 GitStoreState::Local {
574 downstream: downstream_client,
575 ..
576 } => {
577 let mut snapshots = HashMap::default();
578 let (updates_tx, mut updates_rx) = mpsc::unbounded();
579 for repo in self.repositories.values() {
580 updates_tx
581 .unbounded_send(DownstreamUpdate::UpdateRepository(
582 repo.read(cx).snapshot.clone(),
583 ))
584 .ok();
585 }
586 *downstream_client = Some(LocalDownstreamState {
587 client: client.clone(),
588 project_id: ProjectId(project_id),
589 updates_tx,
590 _task: cx.spawn(async move |this, cx| {
591 cx.background_spawn(async move {
592 while let Some(update) = updates_rx.next().await {
593 match update {
594 DownstreamUpdate::UpdateRepository(snapshot) => {
595 if let Some(old_snapshot) = snapshots.get_mut(&snapshot.id)
596 {
597 let update =
598 snapshot.build_update(old_snapshot, project_id);
599 *old_snapshot = snapshot;
600 for update in split_repository_update(update) {
601 client.send(update)?;
602 }
603 } else {
604 let update = snapshot.initial_update(project_id);
605 for update in split_repository_update(update) {
606 client.send(update)?;
607 }
608 snapshots.insert(snapshot.id, snapshot);
609 }
610 }
611 DownstreamUpdate::RemoveRepository(id) => {
612 client.send(proto::RemoveRepository {
613 project_id,
614 id: id.to_proto(),
615 })?;
616 }
617 }
618 }
619 anyhow::Ok(())
620 })
621 .await
622 .ok();
623 this.update(cx, |this, _| {
624 if let GitStoreState::Local {
625 downstream: downstream_client,
626 ..
627 } = &mut this.state
628 {
629 downstream_client.take();
630 } else {
631 unreachable!("unshared called on remote store");
632 }
633 })
634 }),
635 });
636 }
637 }
638 }
639
640 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
641 match &mut self.state {
642 GitStoreState::Local {
643 downstream: downstream_client,
644 ..
645 } => {
646 downstream_client.take();
647 }
648 GitStoreState::Remote {
649 downstream: downstream_client,
650 ..
651 } => {
652 downstream_client.take();
653 }
654 }
655 self.shared_diffs.clear();
656 }
657
658 pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) {
659 self.shared_diffs.remove(peer_id);
660 }
661
662 pub fn active_repository(&self) -> Option<Entity<Repository>> {
663 self.active_repo_id
664 .as_ref()
665 .map(|id| self.repositories[id].clone())
666 }
667
668 pub fn open_unstaged_diff(
669 &mut self,
670 buffer: Entity<Buffer>,
671 cx: &mut Context<Self>,
672 ) -> Task<Result<Entity<BufferDiff>>> {
673 let buffer_id = buffer.read(cx).remote_id();
674 if let Some(diff_state) = self.diffs.get(&buffer_id)
675 && let Some(unstaged_diff) = diff_state
676 .read(cx)
677 .unstaged_diff
678 .as_ref()
679 .and_then(|weak| weak.upgrade())
680 {
681 if let Some(task) =
682 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
683 {
684 return cx.background_executor().spawn(async move {
685 task.await;
686 Ok(unstaged_diff)
687 });
688 }
689 return Task::ready(Ok(unstaged_diff));
690 }
691
692 let Some((repo, repo_path)) =
693 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
694 else {
695 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
696 };
697
698 let task = self
699 .loading_diffs
700 .entry((buffer_id, DiffKind::Unstaged))
701 .or_insert_with(|| {
702 let staged_text = repo.update(cx, |repo, cx| {
703 repo.load_staged_text(buffer_id, repo_path, cx)
704 });
705 cx.spawn(async move |this, cx| {
706 Self::open_diff_internal(
707 this,
708 DiffKind::Unstaged,
709 staged_text.await.map(DiffBasesChange::SetIndex),
710 buffer,
711 cx,
712 )
713 .await
714 .map_err(Arc::new)
715 })
716 .shared()
717 })
718 .clone();
719
720 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
721 }
722
723 pub fn open_diff_since(
724 &mut self,
725 oid: Option<git::Oid>,
726 buffer: Entity<Buffer>,
727 repo: Entity<Repository>,
728 cx: &mut Context<Self>,
729 ) -> Task<Result<Entity<BufferDiff>>> {
730 let buffer_id = buffer.read(cx).remote_id();
731
732 if let Some(diff_state) = self.diffs.get(&buffer_id)
733 && let Some(oid_diff) = diff_state.read(cx).oid_diff(oid)
734 {
735 if let Some(task) =
736 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
737 {
738 return cx.background_executor().spawn(async move {
739 task.await;
740 Ok(oid_diff)
741 });
742 }
743 return Task::ready(Ok(oid_diff));
744 }
745
746 let diff_kind = DiffKind::SinceOid(oid);
747 if let Some(task) = self.loading_diffs.get(&(buffer_id, diff_kind)) {
748 let task = task.clone();
749 return cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) });
750 }
751
752 let task = cx
753 .spawn(async move |this, cx| {
754 let result: Result<Entity<BufferDiff>> = async {
755 let buffer_snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
756 let language_registry =
757 buffer.update(cx, |buffer, _| buffer.language_registry());
758 let content: Option<Arc<str>> = match oid {
759 None => None,
760 Some(oid) => Some(
761 repo.update(cx, |repo, cx| repo.load_blob_content(oid, cx))
762 .await?
763 .into(),
764 ),
765 };
766 let buffer_diff = cx.new(|cx| BufferDiff::new(&buffer_snapshot, cx));
767
768 buffer_diff
769 .update(cx, |buffer_diff, cx| {
770 buffer_diff.language_changed(
771 buffer_snapshot.language().cloned(),
772 language_registry,
773 cx,
774 );
775 buffer_diff.set_base_text(
776 content.clone(),
777 buffer_snapshot.language().cloned(),
778 buffer_snapshot.text,
779 cx,
780 )
781 })
782 .await?;
783 let unstaged_diff = this
784 .update(cx, |this, cx| this.open_unstaged_diff(buffer.clone(), cx))?
785 .await?;
786 buffer_diff.update(cx, |buffer_diff, _| {
787 buffer_diff.set_secondary_diff(unstaged_diff);
788 });
789
790 this.update(cx, |this, cx| {
791 cx.subscribe(&buffer_diff, Self::on_buffer_diff_event)
792 .detach();
793
794 this.loading_diffs.remove(&(buffer_id, diff_kind));
795
796 let git_store = cx.weak_entity();
797 let diff_state = this
798 .diffs
799 .entry(buffer_id)
800 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
801
802 diff_state.update(cx, |state, _| {
803 if let Some(oid) = oid {
804 if let Some(content) = content {
805 state.oid_texts.insert(oid, content);
806 }
807 }
808 state.oid_diffs.insert(oid, buffer_diff.downgrade());
809 });
810 })?;
811
812 Ok(buffer_diff)
813 }
814 .await;
815 result.map_err(Arc::new)
816 })
817 .shared();
818
819 self.loading_diffs
820 .insert((buffer_id, diff_kind), task.clone());
821 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
822 }
823
824 #[ztracing::instrument(skip_all)]
825 pub fn open_uncommitted_diff(
826 &mut self,
827 buffer: Entity<Buffer>,
828 cx: &mut Context<Self>,
829 ) -> Task<Result<Entity<BufferDiff>>> {
830 let buffer_id = buffer.read(cx).remote_id();
831
832 if let Some(diff_state) = self.diffs.get(&buffer_id)
833 && let Some(uncommitted_diff) = diff_state
834 .read(cx)
835 .uncommitted_diff
836 .as_ref()
837 .and_then(|weak| weak.upgrade())
838 {
839 if let Some(task) =
840 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
841 {
842 return cx.background_executor().spawn(async move {
843 task.await;
844 Ok(uncommitted_diff)
845 });
846 }
847 return Task::ready(Ok(uncommitted_diff));
848 }
849
850 let Some((repo, repo_path)) =
851 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
852 else {
853 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
854 };
855
856 let task = self
857 .loading_diffs
858 .entry((buffer_id, DiffKind::Uncommitted))
859 .or_insert_with(|| {
860 let changes = repo.update(cx, |repo, cx| {
861 repo.load_committed_text(buffer_id, repo_path, cx)
862 });
863
864 // todo(lw): hot foreground spawn
865 cx.spawn(async move |this, cx| {
866 Self::open_diff_internal(this, DiffKind::Uncommitted, changes.await, buffer, cx)
867 .await
868 .map_err(Arc::new)
869 })
870 .shared()
871 })
872 .clone();
873
874 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
875 }
876
877 #[ztracing::instrument(skip_all)]
878 async fn open_diff_internal(
879 this: WeakEntity<Self>,
880 kind: DiffKind,
881 texts: Result<DiffBasesChange>,
882 buffer_entity: Entity<Buffer>,
883 cx: &mut AsyncApp,
884 ) -> Result<Entity<BufferDiff>> {
885 let diff_bases_change = match texts {
886 Err(e) => {
887 this.update(cx, |this, cx| {
888 let buffer = buffer_entity.read(cx);
889 let buffer_id = buffer.remote_id();
890 this.loading_diffs.remove(&(buffer_id, kind));
891 })?;
892 return Err(e);
893 }
894 Ok(change) => change,
895 };
896
897 this.update(cx, |this, cx| {
898 let buffer = buffer_entity.read(cx);
899 let buffer_id = buffer.remote_id();
900 let language = buffer.language().cloned();
901 let language_registry = buffer.language_registry();
902 let text_snapshot = buffer.text_snapshot();
903 this.loading_diffs.remove(&(buffer_id, kind));
904
905 let git_store = cx.weak_entity();
906 let diff_state = this
907 .diffs
908 .entry(buffer_id)
909 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
910
911 let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
912
913 cx.subscribe(&diff, Self::on_buffer_diff_event).detach();
914 diff_state.update(cx, |diff_state, cx| {
915 diff_state.language_changed = true;
916 diff_state.language = language;
917 diff_state.language_registry = language_registry;
918
919 match kind {
920 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
921 DiffKind::Uncommitted => {
922 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
923 diff
924 } else {
925 let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
926 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
927 unstaged_diff
928 };
929
930 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
931 diff_state.uncommitted_diff = Some(diff.downgrade())
932 }
933 DiffKind::SinceOid(_) => {
934 unreachable!("open_diff_internal is not used for OID diffs")
935 }
936 }
937
938 diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
939 let rx = diff_state.wait_for_recalculation();
940
941 anyhow::Ok(async move {
942 if let Some(rx) = rx {
943 rx.await;
944 }
945 Ok(diff)
946 })
947 })
948 })??
949 .await
950 }
951
952 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
953 let diff_state = self.diffs.get(&buffer_id)?;
954 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
955 }
956
957 pub fn get_uncommitted_diff(
958 &self,
959 buffer_id: BufferId,
960 cx: &App,
961 ) -> Option<Entity<BufferDiff>> {
962 let diff_state = self.diffs.get(&buffer_id)?;
963 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
964 }
965
966 pub fn get_diff_since_oid(
967 &self,
968 buffer_id: BufferId,
969 oid: Option<git::Oid>,
970 cx: &App,
971 ) -> Option<Entity<BufferDiff>> {
972 let diff_state = self.diffs.get(&buffer_id)?;
973 diff_state.read(cx).oid_diff(oid)
974 }
975
976 pub fn open_conflict_set(
977 &mut self,
978 buffer: Entity<Buffer>,
979 cx: &mut Context<Self>,
980 ) -> Entity<ConflictSet> {
981 log::debug!("open conflict set");
982 let buffer_id = buffer.read(cx).remote_id();
983
984 if let Some(git_state) = self.diffs.get(&buffer_id)
985 && let Some(conflict_set) = git_state
986 .read(cx)
987 .conflict_set
988 .as_ref()
989 .and_then(|weak| weak.upgrade())
990 {
991 let conflict_set = conflict_set;
992 let buffer_snapshot = buffer.read(cx).text_snapshot();
993
994 git_state.update(cx, |state, cx| {
995 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
996 });
997
998 return conflict_set;
999 }
1000
1001 let is_unmerged = self
1002 .repository_and_path_for_buffer_id(buffer_id, cx)
1003 .is_some_and(|(repo, path)| repo.read(cx).snapshot.has_conflict(&path));
1004 let git_store = cx.weak_entity();
1005 let buffer_git_state = self
1006 .diffs
1007 .entry(buffer_id)
1008 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
1009 let conflict_set = cx.new(|cx| ConflictSet::new(buffer_id, is_unmerged, cx));
1010
1011 self._subscriptions
1012 .push(cx.subscribe(&conflict_set, |_, _, _, cx| {
1013 cx.emit(GitStoreEvent::ConflictsUpdated);
1014 }));
1015
1016 buffer_git_state.update(cx, |state, cx| {
1017 state.conflict_set = Some(conflict_set.downgrade());
1018 let buffer_snapshot = buffer.read(cx).text_snapshot();
1019 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
1020 });
1021
1022 conflict_set
1023 }
1024
1025 pub fn project_path_git_status(
1026 &self,
1027 project_path: &ProjectPath,
1028 cx: &App,
1029 ) -> Option<FileStatus> {
1030 let (repo, repo_path) = self.repository_and_path_for_project_path(project_path, cx)?;
1031 Some(repo.read(cx).status_for_path(&repo_path)?.status)
1032 }
1033
1034 pub fn checkpoint(&self, cx: &mut App) -> Task<Result<GitStoreCheckpoint>> {
1035 let mut work_directory_abs_paths = Vec::new();
1036 let mut checkpoints = Vec::new();
1037 for repository in self.repositories.values() {
1038 repository.update(cx, |repository, _| {
1039 work_directory_abs_paths.push(repository.snapshot.work_directory_abs_path.clone());
1040 checkpoints.push(repository.checkpoint().map(|checkpoint| checkpoint?));
1041 });
1042 }
1043
1044 cx.background_executor().spawn(async move {
1045 let checkpoints = future::try_join_all(checkpoints).await?;
1046 Ok(GitStoreCheckpoint {
1047 checkpoints_by_work_dir_abs_path: work_directory_abs_paths
1048 .into_iter()
1049 .zip(checkpoints)
1050 .collect(),
1051 })
1052 })
1053 }
1054
1055 pub fn restore_checkpoint(
1056 &self,
1057 checkpoint: GitStoreCheckpoint,
1058 cx: &mut App,
1059 ) -> Task<Result<()>> {
1060 let repositories_by_work_dir_abs_path = self
1061 .repositories
1062 .values()
1063 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
1064 .collect::<HashMap<_, _>>();
1065
1066 let mut tasks = Vec::new();
1067 for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
1068 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path) {
1069 let restore = repository.update(cx, |repository, _| {
1070 repository.restore_checkpoint(checkpoint)
1071 });
1072 tasks.push(async move { restore.await? });
1073 }
1074 }
1075 cx.background_spawn(async move {
1076 future::try_join_all(tasks).await?;
1077 Ok(())
1078 })
1079 }
1080
1081 /// Compares two checkpoints, returning true if they are equal.
1082 pub fn compare_checkpoints(
1083 &self,
1084 left: GitStoreCheckpoint,
1085 mut right: GitStoreCheckpoint,
1086 cx: &mut App,
1087 ) -> Task<Result<bool>> {
1088 let repositories_by_work_dir_abs_path = self
1089 .repositories
1090 .values()
1091 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
1092 .collect::<HashMap<_, _>>();
1093
1094 let mut tasks = Vec::new();
1095 for (work_dir_abs_path, left_checkpoint) in left.checkpoints_by_work_dir_abs_path {
1096 if let Some(right_checkpoint) = right
1097 .checkpoints_by_work_dir_abs_path
1098 .remove(&work_dir_abs_path)
1099 {
1100 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path)
1101 {
1102 let compare = repository.update(cx, |repository, _| {
1103 repository.compare_checkpoints(left_checkpoint, right_checkpoint)
1104 });
1105
1106 tasks.push(async move { compare.await? });
1107 }
1108 } else {
1109 return Task::ready(Ok(false));
1110 }
1111 }
1112 cx.background_spawn(async move {
1113 Ok(future::try_join_all(tasks)
1114 .await?
1115 .into_iter()
1116 .all(|result| result))
1117 })
1118 }
1119
1120 /// Blames a buffer.
1121 pub fn blame_buffer(
1122 &self,
1123 buffer: &Entity<Buffer>,
1124 version: Option<clock::Global>,
1125 cx: &mut Context<Self>,
1126 ) -> Task<Result<Option<Blame>>> {
1127 let buffer = buffer.read(cx);
1128 let Some((repo, repo_path)) =
1129 self.repository_and_path_for_buffer_id(buffer.remote_id(), cx)
1130 else {
1131 return Task::ready(Err(anyhow!("failed to find a git repository for buffer")));
1132 };
1133 let content = match &version {
1134 Some(version) => buffer.rope_for_version(version),
1135 None => buffer.as_rope().clone(),
1136 };
1137 let line_ending = buffer.line_ending();
1138 let version = version.unwrap_or(buffer.version());
1139 let buffer_id = buffer.remote_id();
1140
1141 let repo = repo.downgrade();
1142 cx.spawn(async move |_, cx| {
1143 let repository_state = repo
1144 .update(cx, |repo, _| repo.repository_state.clone())?
1145 .await
1146 .map_err(|err| anyhow::anyhow!(err))?;
1147 match repository_state {
1148 RepositoryState::Local(LocalRepositoryState { backend, .. }) => backend
1149 .blame(repo_path.clone(), content, line_ending)
1150 .await
1151 .with_context(|| format!("Failed to blame {:?}", repo_path.as_ref()))
1152 .map(Some),
1153 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
1154 let response = client
1155 .request(proto::BlameBuffer {
1156 project_id: project_id.to_proto(),
1157 buffer_id: buffer_id.into(),
1158 version: serialize_version(&version),
1159 })
1160 .await?;
1161 Ok(deserialize_blame_buffer_response(response))
1162 }
1163 }
1164 })
1165 }
1166
1167 pub fn file_history(
1168 &self,
1169 repo: &Entity<Repository>,
1170 path: RepoPath,
1171 cx: &mut App,
1172 ) -> Task<Result<git::repository::FileHistory>> {
1173 let rx = repo.update(cx, |repo, _| repo.file_history(path));
1174
1175 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1176 }
1177
1178 pub fn file_history_paginated(
1179 &self,
1180 repo: &Entity<Repository>,
1181 path: RepoPath,
1182 skip: usize,
1183 limit: Option<usize>,
1184 cx: &mut App,
1185 ) -> Task<Result<git::repository::FileHistory>> {
1186 let rx = repo.update(cx, |repo, _| repo.file_history_paginated(path, skip, limit));
1187
1188 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1189 }
1190
1191 pub fn get_permalink_to_line(
1192 &self,
1193 buffer: &Entity<Buffer>,
1194 selection: Range<u32>,
1195 cx: &mut App,
1196 ) -> Task<Result<url::Url>> {
1197 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1198 return Task::ready(Err(anyhow!("buffer has no file")));
1199 };
1200
1201 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
1202 &(file.worktree.read(cx).id(), file.path.clone()).into(),
1203 cx,
1204 ) else {
1205 // If we're not in a Git repo, check whether this is a Rust source
1206 // file in the Cargo registry (presumably opened with go-to-definition
1207 // from a normal Rust file). If so, we can put together a permalink
1208 // using crate metadata.
1209 if buffer
1210 .read(cx)
1211 .language()
1212 .is_none_or(|lang| lang.name() != "Rust")
1213 {
1214 return Task::ready(Err(anyhow!("no permalink available")));
1215 }
1216 let file_path = file.worktree.read(cx).absolutize(&file.path);
1217 return cx.spawn(async move |cx| {
1218 let provider_registry = cx.update(GitHostingProviderRegistry::default_global);
1219 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1220 .context("no permalink available")
1221 });
1222 };
1223
1224 let buffer_id = buffer.read(cx).remote_id();
1225 let branch = repo.read(cx).branch.clone();
1226 let remote = branch
1227 .as_ref()
1228 .and_then(|b| b.upstream.as_ref())
1229 .and_then(|b| b.remote_name())
1230 .unwrap_or("origin")
1231 .to_string();
1232
1233 let rx = repo.update(cx, |repo, _| {
1234 repo.send_job(None, move |state, cx| async move {
1235 match state {
1236 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
1237 let origin_url = backend
1238 .remote_url(&remote)
1239 .await
1240 .with_context(|| format!("remote \"{remote}\" not found"))?;
1241
1242 let sha = backend.head_sha().await.context("reading HEAD SHA")?;
1243
1244 let provider_registry =
1245 cx.update(GitHostingProviderRegistry::default_global);
1246
1247 let (provider, remote) =
1248 parse_git_remote_url(provider_registry, &origin_url)
1249 .context("parsing Git remote URL")?;
1250
1251 Ok(provider.build_permalink(
1252 remote,
1253 BuildPermalinkParams::new(&sha, &repo_path, Some(selection)),
1254 ))
1255 }
1256 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
1257 let response = client
1258 .request(proto::GetPermalinkToLine {
1259 project_id: project_id.to_proto(),
1260 buffer_id: buffer_id.into(),
1261 selection: Some(proto::Range {
1262 start: selection.start as u64,
1263 end: selection.end as u64,
1264 }),
1265 })
1266 .await?;
1267
1268 url::Url::parse(&response.permalink).context("failed to parse permalink")
1269 }
1270 }
1271 })
1272 });
1273 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1274 }
1275
1276 fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> {
1277 match &self.state {
1278 GitStoreState::Local {
1279 downstream: downstream_client,
1280 ..
1281 } => downstream_client
1282 .as_ref()
1283 .map(|state| (state.client.clone(), state.project_id)),
1284 GitStoreState::Remote {
1285 downstream: downstream_client,
1286 ..
1287 } => downstream_client.clone(),
1288 }
1289 }
1290
1291 fn upstream_client(&self) -> Option<AnyProtoClient> {
1292 match &self.state {
1293 GitStoreState::Local { .. } => None,
1294 GitStoreState::Remote {
1295 upstream_client, ..
1296 } => Some(upstream_client.clone()),
1297 }
1298 }
1299
1300 fn on_worktree_store_event(
1301 &mut self,
1302 worktree_store: Entity<WorktreeStore>,
1303 event: &WorktreeStoreEvent,
1304 cx: &mut Context<Self>,
1305 ) {
1306 let GitStoreState::Local {
1307 project_environment,
1308 downstream,
1309 next_repository_id,
1310 fs,
1311 } = &self.state
1312 else {
1313 return;
1314 };
1315
1316 match event {
1317 WorktreeStoreEvent::WorktreeUpdatedEntries(worktree_id, updated_entries) => {
1318 if let Some(worktree) = self
1319 .worktree_store
1320 .read(cx)
1321 .worktree_for_id(*worktree_id, cx)
1322 {
1323 let paths_by_git_repo =
1324 self.process_updated_entries(&worktree, updated_entries, cx);
1325 let downstream = downstream
1326 .as_ref()
1327 .map(|downstream| downstream.updates_tx.clone());
1328 cx.spawn(async move |_, cx| {
1329 let paths_by_git_repo = paths_by_git_repo.await;
1330 for (repo, paths) in paths_by_git_repo {
1331 repo.update(cx, |repo, cx| {
1332 repo.paths_changed(paths, downstream.clone(), cx);
1333 });
1334 }
1335 })
1336 .detach();
1337 }
1338 }
1339 WorktreeStoreEvent::WorktreeUpdatedGitRepositories(worktree_id, changed_repos) => {
1340 let Some(worktree) = worktree_store.read(cx).worktree_for_id(*worktree_id, cx)
1341 else {
1342 return;
1343 };
1344 if !worktree.read(cx).is_visible() {
1345 log::debug!(
1346 "not adding repositories for local worktree {:?} because it's not visible",
1347 worktree.read(cx).abs_path()
1348 );
1349 return;
1350 }
1351 self.update_repositories_from_worktree(
1352 *worktree_id,
1353 project_environment.clone(),
1354 next_repository_id.clone(),
1355 downstream
1356 .as_ref()
1357 .map(|downstream| downstream.updates_tx.clone()),
1358 changed_repos.clone(),
1359 fs.clone(),
1360 cx,
1361 );
1362 self.local_worktree_git_repos_changed(worktree, changed_repos, cx);
1363 }
1364 WorktreeStoreEvent::WorktreeRemoved(_entity_id, worktree_id) => {
1365 let repos_without_worktree: Vec<RepositoryId> = self
1366 .worktree_ids
1367 .iter_mut()
1368 .filter_map(|(repo_id, worktree_ids)| {
1369 worktree_ids.remove(worktree_id);
1370 if worktree_ids.is_empty() {
1371 Some(*repo_id)
1372 } else {
1373 None
1374 }
1375 })
1376 .collect();
1377 let is_active_repo_removed = repos_without_worktree
1378 .iter()
1379 .any(|repo_id| self.active_repo_id == Some(*repo_id));
1380
1381 for repo_id in repos_without_worktree {
1382 self.repositories.remove(&repo_id);
1383 self.worktree_ids.remove(&repo_id);
1384 if let Some(updates_tx) =
1385 downstream.as_ref().map(|downstream| &downstream.updates_tx)
1386 {
1387 updates_tx
1388 .unbounded_send(DownstreamUpdate::RemoveRepository(repo_id))
1389 .ok();
1390 }
1391 }
1392
1393 if is_active_repo_removed {
1394 if let Some((&repo_id, _)) = self.repositories.iter().next() {
1395 self.active_repo_id = Some(repo_id);
1396 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(repo_id)));
1397 } else {
1398 self.active_repo_id = None;
1399 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1400 }
1401 }
1402 }
1403 _ => {}
1404 }
1405 }
1406 fn on_repository_event(
1407 &mut self,
1408 repo: Entity<Repository>,
1409 event: &RepositoryEvent,
1410 cx: &mut Context<Self>,
1411 ) {
1412 let id = repo.read(cx).id;
1413 let repo_snapshot = repo.read(cx).snapshot.clone();
1414 for (buffer_id, diff) in self.diffs.iter() {
1415 if let Some((buffer_repo, repo_path)) =
1416 self.repository_and_path_for_buffer_id(*buffer_id, cx)
1417 && buffer_repo == repo
1418 {
1419 diff.update(cx, |diff, cx| {
1420 if let Some(conflict_set) = &diff.conflict_set {
1421 let conflict_status_changed =
1422 conflict_set.update(cx, |conflict_set, cx| {
1423 let has_conflict = repo_snapshot.has_conflict(&repo_path);
1424 conflict_set.set_has_conflict(has_conflict, cx)
1425 })?;
1426 if conflict_status_changed {
1427 let buffer_store = self.buffer_store.read(cx);
1428 if let Some(buffer) = buffer_store.get(*buffer_id) {
1429 let _ = diff
1430 .reparse_conflict_markers(buffer.read(cx).text_snapshot(), cx);
1431 }
1432 }
1433 }
1434 anyhow::Ok(())
1435 })
1436 .ok();
1437 }
1438 }
1439 cx.emit(GitStoreEvent::RepositoryUpdated(
1440 id,
1441 event.clone(),
1442 self.active_repo_id == Some(id),
1443 ))
1444 }
1445
1446 fn on_jobs_updated(&mut self, _: Entity<Repository>, _: &JobsUpdated, cx: &mut Context<Self>) {
1447 cx.emit(GitStoreEvent::JobsUpdated)
1448 }
1449
1450 /// Update our list of repositories and schedule git scans in response to a notification from a worktree,
1451 fn update_repositories_from_worktree(
1452 &mut self,
1453 worktree_id: WorktreeId,
1454 project_environment: Entity<ProjectEnvironment>,
1455 next_repository_id: Arc<AtomicU64>,
1456 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
1457 updated_git_repositories: UpdatedGitRepositoriesSet,
1458 fs: Arc<dyn Fs>,
1459 cx: &mut Context<Self>,
1460 ) {
1461 let mut removed_ids = Vec::new();
1462 for update in updated_git_repositories.iter() {
1463 if let Some((id, existing)) = self.repositories.iter().find(|(_, repo)| {
1464 let existing_work_directory_abs_path =
1465 repo.read(cx).work_directory_abs_path.clone();
1466 Some(&existing_work_directory_abs_path)
1467 == update.old_work_directory_abs_path.as_ref()
1468 || Some(&existing_work_directory_abs_path)
1469 == update.new_work_directory_abs_path.as_ref()
1470 }) {
1471 let repo_id = *id;
1472 if let Some(new_work_directory_abs_path) =
1473 update.new_work_directory_abs_path.clone()
1474 {
1475 self.worktree_ids
1476 .entry(repo_id)
1477 .or_insert_with(HashSet::new)
1478 .insert(worktree_id);
1479 existing.update(cx, |existing, cx| {
1480 existing.snapshot.work_directory_abs_path = new_work_directory_abs_path;
1481 existing.schedule_scan(updates_tx.clone(), cx);
1482 });
1483 } else {
1484 if let Some(worktree_ids) = self.worktree_ids.get_mut(&repo_id) {
1485 worktree_ids.remove(&worktree_id);
1486 if worktree_ids.is_empty() {
1487 removed_ids.push(repo_id);
1488 }
1489 }
1490 }
1491 } else if let UpdatedGitRepository {
1492 new_work_directory_abs_path: Some(work_directory_abs_path),
1493 dot_git_abs_path: Some(dot_git_abs_path),
1494 repository_dir_abs_path: Some(_repository_dir_abs_path),
1495 common_dir_abs_path: Some(_common_dir_abs_path),
1496 ..
1497 } = update
1498 {
1499 let id = RepositoryId(next_repository_id.fetch_add(1, atomic::Ordering::Release));
1500 let git_store = cx.weak_entity();
1501 let repo = cx.new(|cx| {
1502 let mut repo = Repository::local(
1503 id,
1504 work_directory_abs_path.clone(),
1505 dot_git_abs_path.clone(),
1506 project_environment.downgrade(),
1507 fs.clone(),
1508 git_store,
1509 cx,
1510 );
1511 if let Some(updates_tx) = updates_tx.as_ref() {
1512 // trigger an empty `UpdateRepository` to ensure remote active_repo_id is set correctly
1513 updates_tx
1514 .unbounded_send(DownstreamUpdate::UpdateRepository(repo.snapshot()))
1515 .ok();
1516 }
1517 repo.schedule_scan(updates_tx.clone(), cx);
1518 repo
1519 });
1520 self._subscriptions
1521 .push(cx.subscribe(&repo, Self::on_repository_event));
1522 self._subscriptions
1523 .push(cx.subscribe(&repo, Self::on_jobs_updated));
1524 self.repositories.insert(id, repo);
1525 self.worktree_ids.insert(id, HashSet::from([worktree_id]));
1526 cx.emit(GitStoreEvent::RepositoryAdded);
1527 self.active_repo_id.get_or_insert_with(|| {
1528 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1529 id
1530 });
1531 }
1532 }
1533
1534 for id in removed_ids {
1535 if self.active_repo_id == Some(id) {
1536 self.active_repo_id = None;
1537 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1538 }
1539 self.repositories.remove(&id);
1540 if let Some(updates_tx) = updates_tx.as_ref() {
1541 updates_tx
1542 .unbounded_send(DownstreamUpdate::RemoveRepository(id))
1543 .ok();
1544 }
1545 }
1546 }
1547
1548 fn on_buffer_store_event(
1549 &mut self,
1550 _: Entity<BufferStore>,
1551 event: &BufferStoreEvent,
1552 cx: &mut Context<Self>,
1553 ) {
1554 match event {
1555 BufferStoreEvent::BufferAdded(buffer) => {
1556 cx.subscribe(buffer, |this, buffer, event, cx| {
1557 if let BufferEvent::LanguageChanged(_) = event {
1558 let buffer_id = buffer.read(cx).remote_id();
1559 if let Some(diff_state) = this.diffs.get(&buffer_id) {
1560 diff_state.update(cx, |diff_state, cx| {
1561 diff_state.buffer_language_changed(buffer, cx);
1562 });
1563 }
1564 }
1565 })
1566 .detach();
1567 }
1568 BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => {
1569 if let Some(diffs) = self.shared_diffs.get_mut(peer_id) {
1570 diffs.remove(buffer_id);
1571 }
1572 }
1573 BufferStoreEvent::BufferDropped(buffer_id) => {
1574 self.diffs.remove(buffer_id);
1575 for diffs in self.shared_diffs.values_mut() {
1576 diffs.remove(buffer_id);
1577 }
1578 }
1579 BufferStoreEvent::BufferChangedFilePath { buffer, .. } => {
1580 // Whenever a buffer's file path changes, it's possible that the
1581 // new path is actually a path that is being tracked by a git
1582 // repository. In that case, we'll want to update the buffer's
1583 // `BufferDiffState`, in case it already has one.
1584 let buffer_id = buffer.read(cx).remote_id();
1585 let diff_state = self.diffs.get(&buffer_id);
1586 let repo = self.repository_and_path_for_buffer_id(buffer_id, cx);
1587
1588 if let Some(diff_state) = diff_state
1589 && let Some((repo, repo_path)) = repo
1590 {
1591 let buffer = buffer.clone();
1592 let diff_state = diff_state.clone();
1593
1594 cx.spawn(async move |_git_store, cx| {
1595 async {
1596 let diff_bases_change = repo
1597 .update(cx, |repo, cx| {
1598 repo.load_committed_text(buffer_id, repo_path, cx)
1599 })
1600 .await?;
1601
1602 diff_state.update(cx, |diff_state, cx| {
1603 let buffer_snapshot = buffer.read(cx).text_snapshot();
1604 diff_state.diff_bases_changed(
1605 buffer_snapshot,
1606 Some(diff_bases_change),
1607 cx,
1608 );
1609 });
1610 anyhow::Ok(())
1611 }
1612 .await
1613 .log_err();
1614 })
1615 .detach();
1616 }
1617 }
1618 }
1619 }
1620
1621 pub fn recalculate_buffer_diffs(
1622 &mut self,
1623 buffers: Vec<Entity<Buffer>>,
1624 cx: &mut Context<Self>,
1625 ) -> impl Future<Output = ()> + use<> {
1626 let mut futures = Vec::new();
1627 for buffer in buffers {
1628 if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
1629 let buffer = buffer.read(cx).text_snapshot();
1630 diff_state.update(cx, |diff_state, cx| {
1631 diff_state.recalculate_diffs(buffer.clone(), cx);
1632 futures.extend(diff_state.wait_for_recalculation().map(FutureExt::boxed));
1633 });
1634 futures.push(diff_state.update(cx, |diff_state, cx| {
1635 diff_state
1636 .reparse_conflict_markers(buffer, cx)
1637 .map(|_| {})
1638 .boxed()
1639 }));
1640 }
1641 }
1642 async move {
1643 futures::future::join_all(futures).await;
1644 }
1645 }
1646
1647 fn on_buffer_diff_event(
1648 &mut self,
1649 diff: Entity<buffer_diff::BufferDiff>,
1650 event: &BufferDiffEvent,
1651 cx: &mut Context<Self>,
1652 ) {
1653 if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
1654 let buffer_id = diff.read(cx).buffer_id;
1655 if let Some(diff_state) = self.diffs.get(&buffer_id) {
1656 let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
1657 diff_state.hunk_staging_operation_count += 1;
1658 diff_state.hunk_staging_operation_count
1659 });
1660 if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
1661 let recv = repo.update(cx, |repo, cx| {
1662 log::debug!("hunks changed for {}", path.as_unix_str());
1663 repo.spawn_set_index_text_job(
1664 path,
1665 new_index_text.as_ref().map(|rope| rope.to_string()),
1666 Some(hunk_staging_operation_count),
1667 cx,
1668 )
1669 });
1670 let diff = diff.downgrade();
1671 cx.spawn(async move |this, cx| {
1672 if let Ok(Err(error)) = cx.background_spawn(recv).await {
1673 diff.update(cx, |diff, cx| {
1674 diff.clear_pending_hunks(cx);
1675 })
1676 .ok();
1677 this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
1678 .ok();
1679 }
1680 })
1681 .detach();
1682 }
1683 }
1684 }
1685 }
1686
1687 fn local_worktree_git_repos_changed(
1688 &mut self,
1689 worktree: Entity<Worktree>,
1690 changed_repos: &UpdatedGitRepositoriesSet,
1691 cx: &mut Context<Self>,
1692 ) {
1693 log::debug!("local worktree repos changed");
1694 debug_assert!(worktree.read(cx).is_local());
1695
1696 for repository in self.repositories.values() {
1697 repository.update(cx, |repository, cx| {
1698 let repo_abs_path = &repository.work_directory_abs_path;
1699 if changed_repos.iter().any(|update| {
1700 update.old_work_directory_abs_path.as_ref() == Some(repo_abs_path)
1701 || update.new_work_directory_abs_path.as_ref() == Some(repo_abs_path)
1702 }) {
1703 repository.reload_buffer_diff_bases(cx);
1704 }
1705 });
1706 }
1707 }
1708
1709 pub fn repositories(&self) -> &HashMap<RepositoryId, Entity<Repository>> {
1710 &self.repositories
1711 }
1712
1713 pub fn status_for_buffer_id(&self, buffer_id: BufferId, cx: &App) -> Option<FileStatus> {
1714 let (repo, path) = self.repository_and_path_for_buffer_id(buffer_id, cx)?;
1715 let status = repo.read(cx).snapshot.status_for_path(&path)?;
1716 Some(status.status)
1717 }
1718
1719 pub fn repository_and_path_for_buffer_id(
1720 &self,
1721 buffer_id: BufferId,
1722 cx: &App,
1723 ) -> Option<(Entity<Repository>, RepoPath)> {
1724 let buffer = self.buffer_store.read(cx).get(buffer_id)?;
1725 let project_path = buffer.read(cx).project_path(cx)?;
1726 self.repository_and_path_for_project_path(&project_path, cx)
1727 }
1728
1729 pub fn repository_and_path_for_project_path(
1730 &self,
1731 path: &ProjectPath,
1732 cx: &App,
1733 ) -> Option<(Entity<Repository>, RepoPath)> {
1734 let abs_path = self.worktree_store.read(cx).absolutize(path, cx)?;
1735 self.repositories
1736 .values()
1737 .filter_map(|repo| {
1738 let repo_path = repo.read(cx).abs_path_to_repo_path(&abs_path)?;
1739 Some((repo.clone(), repo_path))
1740 })
1741 .max_by_key(|(repo, _)| repo.read(cx).work_directory_abs_path.clone())
1742 }
1743
1744 pub fn git_init(
1745 &self,
1746 path: Arc<Path>,
1747 fallback_branch_name: String,
1748 cx: &App,
1749 ) -> Task<Result<()>> {
1750 match &self.state {
1751 GitStoreState::Local { fs, .. } => {
1752 let fs = fs.clone();
1753 cx.background_executor()
1754 .spawn(async move { fs.git_init(&path, fallback_branch_name).await })
1755 }
1756 GitStoreState::Remote {
1757 upstream_client,
1758 upstream_project_id: project_id,
1759 ..
1760 } => {
1761 let client = upstream_client.clone();
1762 let project_id = *project_id;
1763 cx.background_executor().spawn(async move {
1764 client
1765 .request(proto::GitInit {
1766 project_id: project_id,
1767 abs_path: path.to_string_lossy().into_owned(),
1768 fallback_branch_name,
1769 })
1770 .await?;
1771 Ok(())
1772 })
1773 }
1774 }
1775 }
1776
1777 pub fn git_clone(
1778 &self,
1779 repo: String,
1780 path: impl Into<Arc<std::path::Path>>,
1781 cx: &App,
1782 ) -> Task<Result<()>> {
1783 let path = path.into();
1784 match &self.state {
1785 GitStoreState::Local { fs, .. } => {
1786 let fs = fs.clone();
1787 cx.background_executor()
1788 .spawn(async move { fs.git_clone(&repo, &path).await })
1789 }
1790 GitStoreState::Remote {
1791 upstream_client,
1792 upstream_project_id,
1793 ..
1794 } => {
1795 if upstream_client.is_via_collab() {
1796 return Task::ready(Err(anyhow!(
1797 "Git Clone isn't supported for project guests"
1798 )));
1799 }
1800 let request = upstream_client.request(proto::GitClone {
1801 project_id: *upstream_project_id,
1802 abs_path: path.to_string_lossy().into_owned(),
1803 remote_repo: repo,
1804 });
1805
1806 cx.background_spawn(async move {
1807 let result = request.await?;
1808
1809 match result.success {
1810 true => Ok(()),
1811 false => Err(anyhow!("Git Clone failed")),
1812 }
1813 })
1814 }
1815 }
1816 }
1817
1818 async fn handle_update_repository(
1819 this: Entity<Self>,
1820 envelope: TypedEnvelope<proto::UpdateRepository>,
1821 mut cx: AsyncApp,
1822 ) -> Result<()> {
1823 this.update(&mut cx, |this, cx| {
1824 let path_style = this.worktree_store.read(cx).path_style();
1825 let mut update = envelope.payload;
1826
1827 let id = RepositoryId::from_proto(update.id);
1828 let client = this.upstream_client().context("no upstream client")?;
1829
1830 let mut repo_subscription = None;
1831 let repo = this.repositories.entry(id).or_insert_with(|| {
1832 let git_store = cx.weak_entity();
1833 let repo = cx.new(|cx| {
1834 Repository::remote(
1835 id,
1836 Path::new(&update.abs_path).into(),
1837 path_style,
1838 ProjectId(update.project_id),
1839 client,
1840 git_store,
1841 cx,
1842 )
1843 });
1844 repo_subscription = Some(cx.subscribe(&repo, Self::on_repository_event));
1845 cx.emit(GitStoreEvent::RepositoryAdded);
1846 repo
1847 });
1848 this._subscriptions.extend(repo_subscription);
1849
1850 repo.update(cx, {
1851 let update = update.clone();
1852 |repo, cx| repo.apply_remote_update(update, cx)
1853 })?;
1854
1855 this.active_repo_id.get_or_insert_with(|| {
1856 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1857 id
1858 });
1859
1860 if let Some((client, project_id)) = this.downstream_client() {
1861 update.project_id = project_id.to_proto();
1862 client.send(update).log_err();
1863 }
1864 Ok(())
1865 })
1866 }
1867
1868 async fn handle_remove_repository(
1869 this: Entity<Self>,
1870 envelope: TypedEnvelope<proto::RemoveRepository>,
1871 mut cx: AsyncApp,
1872 ) -> Result<()> {
1873 this.update(&mut cx, |this, cx| {
1874 let mut update = envelope.payload;
1875 let id = RepositoryId::from_proto(update.id);
1876 this.repositories.remove(&id);
1877 if let Some((client, project_id)) = this.downstream_client() {
1878 update.project_id = project_id.to_proto();
1879 client.send(update).log_err();
1880 }
1881 if this.active_repo_id == Some(id) {
1882 this.active_repo_id = None;
1883 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1884 }
1885 cx.emit(GitStoreEvent::RepositoryRemoved(id));
1886 });
1887 Ok(())
1888 }
1889
1890 async fn handle_git_init(
1891 this: Entity<Self>,
1892 envelope: TypedEnvelope<proto::GitInit>,
1893 cx: AsyncApp,
1894 ) -> Result<proto::Ack> {
1895 let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1896 let name = envelope.payload.fallback_branch_name;
1897 cx.update(|cx| this.read(cx).git_init(path, name, cx))
1898 .await?;
1899
1900 Ok(proto::Ack {})
1901 }
1902
1903 async fn handle_git_clone(
1904 this: Entity<Self>,
1905 envelope: TypedEnvelope<proto::GitClone>,
1906 cx: AsyncApp,
1907 ) -> Result<proto::GitCloneResponse> {
1908 let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1909 let repo_name = envelope.payload.remote_repo;
1910 let result = cx
1911 .update(|cx| this.read(cx).git_clone(repo_name, path, cx))
1912 .await;
1913
1914 Ok(proto::GitCloneResponse {
1915 success: result.is_ok(),
1916 })
1917 }
1918
1919 async fn handle_fetch(
1920 this: Entity<Self>,
1921 envelope: TypedEnvelope<proto::Fetch>,
1922 mut cx: AsyncApp,
1923 ) -> Result<proto::RemoteMessageResponse> {
1924 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1925 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1926 let fetch_options = FetchOptions::from_proto(envelope.payload.remote);
1927 let askpass_id = envelope.payload.askpass_id;
1928
1929 let askpass = make_remote_delegate(
1930 this,
1931 envelope.payload.project_id,
1932 repository_id,
1933 askpass_id,
1934 &mut cx,
1935 );
1936
1937 let remote_output = repository_handle
1938 .update(&mut cx, |repository_handle, cx| {
1939 repository_handle.fetch(fetch_options, askpass, cx)
1940 })
1941 .await??;
1942
1943 Ok(proto::RemoteMessageResponse {
1944 stdout: remote_output.stdout,
1945 stderr: remote_output.stderr,
1946 })
1947 }
1948
1949 async fn handle_push(
1950 this: Entity<Self>,
1951 envelope: TypedEnvelope<proto::Push>,
1952 mut cx: AsyncApp,
1953 ) -> Result<proto::RemoteMessageResponse> {
1954 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1955 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1956
1957 let askpass_id = envelope.payload.askpass_id;
1958 let askpass = make_remote_delegate(
1959 this,
1960 envelope.payload.project_id,
1961 repository_id,
1962 askpass_id,
1963 &mut cx,
1964 );
1965
1966 let options = envelope
1967 .payload
1968 .options
1969 .as_ref()
1970 .map(|_| match envelope.payload.options() {
1971 proto::push::PushOptions::SetUpstream => git::repository::PushOptions::SetUpstream,
1972 proto::push::PushOptions::Force => git::repository::PushOptions::Force,
1973 });
1974
1975 let branch_name = envelope.payload.branch_name.into();
1976 let remote_branch_name = envelope.payload.remote_branch_name.into();
1977 let remote_name = envelope.payload.remote_name.into();
1978
1979 let remote_output = repository_handle
1980 .update(&mut cx, |repository_handle, cx| {
1981 repository_handle.push(
1982 branch_name,
1983 remote_branch_name,
1984 remote_name,
1985 options,
1986 askpass,
1987 cx,
1988 )
1989 })
1990 .await??;
1991 Ok(proto::RemoteMessageResponse {
1992 stdout: remote_output.stdout,
1993 stderr: remote_output.stderr,
1994 })
1995 }
1996
1997 async fn handle_pull(
1998 this: Entity<Self>,
1999 envelope: TypedEnvelope<proto::Pull>,
2000 mut cx: AsyncApp,
2001 ) -> Result<proto::RemoteMessageResponse> {
2002 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2003 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2004 let askpass_id = envelope.payload.askpass_id;
2005 let askpass = make_remote_delegate(
2006 this,
2007 envelope.payload.project_id,
2008 repository_id,
2009 askpass_id,
2010 &mut cx,
2011 );
2012
2013 let branch_name = envelope.payload.branch_name.map(|name| name.into());
2014 let remote_name = envelope.payload.remote_name.into();
2015 let rebase = envelope.payload.rebase;
2016
2017 let remote_message = repository_handle
2018 .update(&mut cx, |repository_handle, cx| {
2019 repository_handle.pull(branch_name, remote_name, rebase, askpass, cx)
2020 })
2021 .await??;
2022
2023 Ok(proto::RemoteMessageResponse {
2024 stdout: remote_message.stdout,
2025 stderr: remote_message.stderr,
2026 })
2027 }
2028
2029 async fn handle_stage(
2030 this: Entity<Self>,
2031 envelope: TypedEnvelope<proto::Stage>,
2032 mut cx: AsyncApp,
2033 ) -> Result<proto::Ack> {
2034 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2035 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2036
2037 let entries = envelope
2038 .payload
2039 .paths
2040 .into_iter()
2041 .map(|path| RepoPath::new(&path))
2042 .collect::<Result<Vec<_>>>()?;
2043
2044 repository_handle
2045 .update(&mut cx, |repository_handle, cx| {
2046 repository_handle.stage_entries(entries, cx)
2047 })
2048 .await?;
2049 Ok(proto::Ack {})
2050 }
2051
2052 async fn handle_unstage(
2053 this: Entity<Self>,
2054 envelope: TypedEnvelope<proto::Unstage>,
2055 mut cx: AsyncApp,
2056 ) -> Result<proto::Ack> {
2057 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2058 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2059
2060 let entries = envelope
2061 .payload
2062 .paths
2063 .into_iter()
2064 .map(|path| RepoPath::new(&path))
2065 .collect::<Result<Vec<_>>>()?;
2066
2067 repository_handle
2068 .update(&mut cx, |repository_handle, cx| {
2069 repository_handle.unstage_entries(entries, cx)
2070 })
2071 .await?;
2072
2073 Ok(proto::Ack {})
2074 }
2075
2076 async fn handle_stash(
2077 this: Entity<Self>,
2078 envelope: TypedEnvelope<proto::Stash>,
2079 mut cx: AsyncApp,
2080 ) -> Result<proto::Ack> {
2081 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2082 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2083
2084 let entries = envelope
2085 .payload
2086 .paths
2087 .into_iter()
2088 .map(|path| RepoPath::new(&path))
2089 .collect::<Result<Vec<_>>>()?;
2090
2091 repository_handle
2092 .update(&mut cx, |repository_handle, cx| {
2093 repository_handle.stash_entries(entries, cx)
2094 })
2095 .await?;
2096
2097 Ok(proto::Ack {})
2098 }
2099
2100 async fn handle_stash_pop(
2101 this: Entity<Self>,
2102 envelope: TypedEnvelope<proto::StashPop>,
2103 mut cx: AsyncApp,
2104 ) -> Result<proto::Ack> {
2105 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2106 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2107 let stash_index = envelope.payload.stash_index.map(|i| i as usize);
2108
2109 repository_handle
2110 .update(&mut cx, |repository_handle, cx| {
2111 repository_handle.stash_pop(stash_index, cx)
2112 })
2113 .await?;
2114
2115 Ok(proto::Ack {})
2116 }
2117
2118 async fn handle_stash_apply(
2119 this: Entity<Self>,
2120 envelope: TypedEnvelope<proto::StashApply>,
2121 mut cx: AsyncApp,
2122 ) -> Result<proto::Ack> {
2123 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2124 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2125 let stash_index = envelope.payload.stash_index.map(|i| i as usize);
2126
2127 repository_handle
2128 .update(&mut cx, |repository_handle, cx| {
2129 repository_handle.stash_apply(stash_index, cx)
2130 })
2131 .await?;
2132
2133 Ok(proto::Ack {})
2134 }
2135
2136 async fn handle_stash_drop(
2137 this: Entity<Self>,
2138 envelope: TypedEnvelope<proto::StashDrop>,
2139 mut cx: AsyncApp,
2140 ) -> Result<proto::Ack> {
2141 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2142 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2143 let stash_index = envelope.payload.stash_index.map(|i| i as usize);
2144
2145 repository_handle
2146 .update(&mut cx, |repository_handle, cx| {
2147 repository_handle.stash_drop(stash_index, cx)
2148 })
2149 .await??;
2150
2151 Ok(proto::Ack {})
2152 }
2153
2154 async fn handle_set_index_text(
2155 this: Entity<Self>,
2156 envelope: TypedEnvelope<proto::SetIndexText>,
2157 mut cx: AsyncApp,
2158 ) -> Result<proto::Ack> {
2159 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2160 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2161 let repo_path = RepoPath::from_proto(&envelope.payload.path)?;
2162
2163 repository_handle
2164 .update(&mut cx, |repository_handle, cx| {
2165 repository_handle.spawn_set_index_text_job(
2166 repo_path,
2167 envelope.payload.text,
2168 None,
2169 cx,
2170 )
2171 })
2172 .await??;
2173 Ok(proto::Ack {})
2174 }
2175
2176 async fn handle_run_hook(
2177 this: Entity<Self>,
2178 envelope: TypedEnvelope<proto::RunGitHook>,
2179 mut cx: AsyncApp,
2180 ) -> Result<proto::Ack> {
2181 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2182 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2183 let hook = RunHook::from_proto(envelope.payload.hook).context("invalid hook")?;
2184 repository_handle
2185 .update(&mut cx, |repository_handle, cx| {
2186 repository_handle.run_hook(hook, cx)
2187 })
2188 .await??;
2189 Ok(proto::Ack {})
2190 }
2191
2192 async fn handle_commit(
2193 this: Entity<Self>,
2194 envelope: TypedEnvelope<proto::Commit>,
2195 mut cx: AsyncApp,
2196 ) -> Result<proto::Ack> {
2197 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2198 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2199 let askpass_id = envelope.payload.askpass_id;
2200
2201 let askpass = make_remote_delegate(
2202 this,
2203 envelope.payload.project_id,
2204 repository_id,
2205 askpass_id,
2206 &mut cx,
2207 );
2208
2209 let message = SharedString::from(envelope.payload.message);
2210 let name = envelope.payload.name.map(SharedString::from);
2211 let email = envelope.payload.email.map(SharedString::from);
2212 let options = envelope.payload.options.unwrap_or_default();
2213
2214 repository_handle
2215 .update(&mut cx, |repository_handle, cx| {
2216 repository_handle.commit(
2217 message,
2218 name.zip(email),
2219 CommitOptions {
2220 amend: options.amend,
2221 signoff: options.signoff,
2222 },
2223 askpass,
2224 cx,
2225 )
2226 })
2227 .await??;
2228 Ok(proto::Ack {})
2229 }
2230
2231 async fn handle_get_remotes(
2232 this: Entity<Self>,
2233 envelope: TypedEnvelope<proto::GetRemotes>,
2234 mut cx: AsyncApp,
2235 ) -> Result<proto::GetRemotesResponse> {
2236 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2237 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2238
2239 let branch_name = envelope.payload.branch_name;
2240 let is_push = envelope.payload.is_push;
2241
2242 let remotes = repository_handle
2243 .update(&mut cx, |repository_handle, _| {
2244 repository_handle.get_remotes(branch_name, is_push)
2245 })
2246 .await??;
2247
2248 Ok(proto::GetRemotesResponse {
2249 remotes: remotes
2250 .into_iter()
2251 .map(|remotes| proto::get_remotes_response::Remote {
2252 name: remotes.name.to_string(),
2253 })
2254 .collect::<Vec<_>>(),
2255 })
2256 }
2257
2258 async fn handle_get_worktrees(
2259 this: Entity<Self>,
2260 envelope: TypedEnvelope<proto::GitGetWorktrees>,
2261 mut cx: AsyncApp,
2262 ) -> Result<proto::GitWorktreesResponse> {
2263 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2264 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2265
2266 let worktrees = repository_handle
2267 .update(&mut cx, |repository_handle, _| {
2268 repository_handle.worktrees()
2269 })
2270 .await??;
2271
2272 Ok(proto::GitWorktreesResponse {
2273 worktrees: worktrees
2274 .into_iter()
2275 .map(|worktree| worktree_to_proto(&worktree))
2276 .collect::<Vec<_>>(),
2277 })
2278 }
2279
2280 async fn handle_create_worktree(
2281 this: Entity<Self>,
2282 envelope: TypedEnvelope<proto::GitCreateWorktree>,
2283 mut cx: AsyncApp,
2284 ) -> Result<proto::Ack> {
2285 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2286 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2287 let directory = PathBuf::from(envelope.payload.directory);
2288 let name = envelope.payload.name;
2289 let commit = envelope.payload.commit;
2290
2291 repository_handle
2292 .update(&mut cx, |repository_handle, _| {
2293 repository_handle.create_worktree(name, directory, commit)
2294 })
2295 .await??;
2296
2297 Ok(proto::Ack {})
2298 }
2299
2300 async fn handle_get_branches(
2301 this: Entity<Self>,
2302 envelope: TypedEnvelope<proto::GitGetBranches>,
2303 mut cx: AsyncApp,
2304 ) -> Result<proto::GitBranchesResponse> {
2305 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2306 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2307
2308 let branches = repository_handle
2309 .update(&mut cx, |repository_handle, _| repository_handle.branches())
2310 .await??;
2311
2312 Ok(proto::GitBranchesResponse {
2313 branches: branches
2314 .into_iter()
2315 .map(|branch| branch_to_proto(&branch))
2316 .collect::<Vec<_>>(),
2317 })
2318 }
2319 async fn handle_get_default_branch(
2320 this: Entity<Self>,
2321 envelope: TypedEnvelope<proto::GetDefaultBranch>,
2322 mut cx: AsyncApp,
2323 ) -> Result<proto::GetDefaultBranchResponse> {
2324 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2325 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2326
2327 let branch = repository_handle
2328 .update(&mut cx, |repository_handle, _| {
2329 repository_handle.default_branch(false)
2330 })
2331 .await??
2332 .map(Into::into);
2333
2334 Ok(proto::GetDefaultBranchResponse { branch })
2335 }
2336 async fn handle_create_branch(
2337 this: Entity<Self>,
2338 envelope: TypedEnvelope<proto::GitCreateBranch>,
2339 mut cx: AsyncApp,
2340 ) -> Result<proto::Ack> {
2341 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2342 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2343 let branch_name = envelope.payload.branch_name;
2344
2345 repository_handle
2346 .update(&mut cx, |repository_handle, _| {
2347 repository_handle.create_branch(branch_name, None)
2348 })
2349 .await??;
2350
2351 Ok(proto::Ack {})
2352 }
2353
2354 async fn handle_change_branch(
2355 this: Entity<Self>,
2356 envelope: TypedEnvelope<proto::GitChangeBranch>,
2357 mut cx: AsyncApp,
2358 ) -> Result<proto::Ack> {
2359 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2360 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2361 let branch_name = envelope.payload.branch_name;
2362
2363 repository_handle
2364 .update(&mut cx, |repository_handle, _| {
2365 repository_handle.change_branch(branch_name)
2366 })
2367 .await??;
2368
2369 Ok(proto::Ack {})
2370 }
2371
2372 async fn handle_rename_branch(
2373 this: Entity<Self>,
2374 envelope: TypedEnvelope<proto::GitRenameBranch>,
2375 mut cx: AsyncApp,
2376 ) -> Result<proto::Ack> {
2377 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2378 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2379 let branch = envelope.payload.branch;
2380 let new_name = envelope.payload.new_name;
2381
2382 repository_handle
2383 .update(&mut cx, |repository_handle, _| {
2384 repository_handle.rename_branch(branch, new_name)
2385 })
2386 .await??;
2387
2388 Ok(proto::Ack {})
2389 }
2390
2391 async fn handle_create_remote(
2392 this: Entity<Self>,
2393 envelope: TypedEnvelope<proto::GitCreateRemote>,
2394 mut cx: AsyncApp,
2395 ) -> Result<proto::Ack> {
2396 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2397 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2398 let remote_name = envelope.payload.remote_name;
2399 let remote_url = envelope.payload.remote_url;
2400
2401 repository_handle
2402 .update(&mut cx, |repository_handle, _| {
2403 repository_handle.create_remote(remote_name, remote_url)
2404 })
2405 .await??;
2406
2407 Ok(proto::Ack {})
2408 }
2409
2410 async fn handle_delete_branch(
2411 this: Entity<Self>,
2412 envelope: TypedEnvelope<proto::GitDeleteBranch>,
2413 mut cx: AsyncApp,
2414 ) -> Result<proto::Ack> {
2415 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2416 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2417 let branch_name = envelope.payload.branch_name;
2418
2419 repository_handle
2420 .update(&mut cx, |repository_handle, _| {
2421 repository_handle.delete_branch(branch_name)
2422 })
2423 .await??;
2424
2425 Ok(proto::Ack {})
2426 }
2427
2428 async fn handle_remove_remote(
2429 this: Entity<Self>,
2430 envelope: TypedEnvelope<proto::GitRemoveRemote>,
2431 mut cx: AsyncApp,
2432 ) -> Result<proto::Ack> {
2433 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2434 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2435 let remote_name = envelope.payload.remote_name;
2436
2437 repository_handle
2438 .update(&mut cx, |repository_handle, _| {
2439 repository_handle.remove_remote(remote_name)
2440 })
2441 .await??;
2442
2443 Ok(proto::Ack {})
2444 }
2445
2446 async fn handle_show(
2447 this: Entity<Self>,
2448 envelope: TypedEnvelope<proto::GitShow>,
2449 mut cx: AsyncApp,
2450 ) -> Result<proto::GitCommitDetails> {
2451 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2452 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2453
2454 let commit = repository_handle
2455 .update(&mut cx, |repository_handle, _| {
2456 repository_handle.show(envelope.payload.commit)
2457 })
2458 .await??;
2459 Ok(proto::GitCommitDetails {
2460 sha: commit.sha.into(),
2461 message: commit.message.into(),
2462 commit_timestamp: commit.commit_timestamp,
2463 author_email: commit.author_email.into(),
2464 author_name: commit.author_name.into(),
2465 })
2466 }
2467
2468 async fn handle_load_commit_diff(
2469 this: Entity<Self>,
2470 envelope: TypedEnvelope<proto::LoadCommitDiff>,
2471 mut cx: AsyncApp,
2472 ) -> Result<proto::LoadCommitDiffResponse> {
2473 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2474 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2475
2476 let commit_diff = repository_handle
2477 .update(&mut cx, |repository_handle, _| {
2478 repository_handle.load_commit_diff(envelope.payload.commit)
2479 })
2480 .await??;
2481 Ok(proto::LoadCommitDiffResponse {
2482 files: commit_diff
2483 .files
2484 .into_iter()
2485 .map(|file| proto::CommitFile {
2486 path: file.path.to_proto(),
2487 old_text: file.old_text,
2488 new_text: file.new_text,
2489 is_binary: file.is_binary,
2490 })
2491 .collect(),
2492 })
2493 }
2494
2495 async fn handle_file_history(
2496 this: Entity<Self>,
2497 envelope: TypedEnvelope<proto::GitFileHistory>,
2498 mut cx: AsyncApp,
2499 ) -> Result<proto::GitFileHistoryResponse> {
2500 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2501 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2502 let path = RepoPath::from_proto(&envelope.payload.path)?;
2503 let skip = envelope.payload.skip as usize;
2504 let limit = envelope.payload.limit.map(|l| l as usize);
2505
2506 let file_history = repository_handle
2507 .update(&mut cx, |repository_handle, _| {
2508 repository_handle.file_history_paginated(path, skip, limit)
2509 })
2510 .await??;
2511
2512 Ok(proto::GitFileHistoryResponse {
2513 entries: file_history
2514 .entries
2515 .into_iter()
2516 .map(|entry| proto::FileHistoryEntry {
2517 sha: entry.sha.to_string(),
2518 subject: entry.subject.to_string(),
2519 message: entry.message.to_string(),
2520 commit_timestamp: entry.commit_timestamp,
2521 author_name: entry.author_name.to_string(),
2522 author_email: entry.author_email.to_string(),
2523 })
2524 .collect(),
2525 path: file_history.path.to_proto(),
2526 })
2527 }
2528
2529 async fn handle_reset(
2530 this: Entity<Self>,
2531 envelope: TypedEnvelope<proto::GitReset>,
2532 mut cx: AsyncApp,
2533 ) -> Result<proto::Ack> {
2534 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2535 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2536
2537 let mode = match envelope.payload.mode() {
2538 git_reset::ResetMode::Soft => ResetMode::Soft,
2539 git_reset::ResetMode::Mixed => ResetMode::Mixed,
2540 };
2541
2542 repository_handle
2543 .update(&mut cx, |repository_handle, cx| {
2544 repository_handle.reset(envelope.payload.commit, mode, cx)
2545 })
2546 .await??;
2547 Ok(proto::Ack {})
2548 }
2549
2550 async fn handle_checkout_files(
2551 this: Entity<Self>,
2552 envelope: TypedEnvelope<proto::GitCheckoutFiles>,
2553 mut cx: AsyncApp,
2554 ) -> Result<proto::Ack> {
2555 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2556 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2557 let paths = envelope
2558 .payload
2559 .paths
2560 .iter()
2561 .map(|s| RepoPath::from_proto(s))
2562 .collect::<Result<Vec<_>>>()?;
2563
2564 repository_handle
2565 .update(&mut cx, |repository_handle, cx| {
2566 repository_handle.checkout_files(&envelope.payload.commit, paths, cx)
2567 })
2568 .await?;
2569 Ok(proto::Ack {})
2570 }
2571
2572 async fn handle_open_commit_message_buffer(
2573 this: Entity<Self>,
2574 envelope: TypedEnvelope<proto::OpenCommitMessageBuffer>,
2575 mut cx: AsyncApp,
2576 ) -> Result<proto::OpenBufferResponse> {
2577 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2578 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
2579 let buffer = repository
2580 .update(&mut cx, |repository, cx| {
2581 repository.open_commit_buffer(None, this.read(cx).buffer_store.clone(), cx)
2582 })
2583 .await?;
2584
2585 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id());
2586 this.update(&mut cx, |this, cx| {
2587 this.buffer_store.update(cx, |buffer_store, cx| {
2588 buffer_store
2589 .create_buffer_for_peer(
2590 &buffer,
2591 envelope.original_sender_id.unwrap_or(envelope.sender_id),
2592 cx,
2593 )
2594 .detach_and_log_err(cx);
2595 })
2596 });
2597
2598 Ok(proto::OpenBufferResponse {
2599 buffer_id: buffer_id.to_proto(),
2600 })
2601 }
2602
2603 async fn handle_askpass(
2604 this: Entity<Self>,
2605 envelope: TypedEnvelope<proto::AskPassRequest>,
2606 mut cx: AsyncApp,
2607 ) -> Result<proto::AskPassResponse> {
2608 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2609 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
2610
2611 let delegates = cx.update(|cx| repository.read(cx).askpass_delegates.clone());
2612 let Some(mut askpass) = delegates.lock().remove(&envelope.payload.askpass_id) else {
2613 debug_panic!("no askpass found");
2614 anyhow::bail!("no askpass found");
2615 };
2616
2617 let response = askpass
2618 .ask_password(envelope.payload.prompt)
2619 .await
2620 .ok_or_else(|| anyhow::anyhow!("askpass cancelled"))?;
2621
2622 delegates
2623 .lock()
2624 .insert(envelope.payload.askpass_id, askpass);
2625
2626 // In fact, we don't quite know what we're doing here, as we're sending askpass password unencrypted, but..
2627 Ok(proto::AskPassResponse {
2628 response: response.decrypt(IKnowWhatIAmDoingAndIHaveReadTheDocs)?,
2629 })
2630 }
2631
2632 async fn handle_check_for_pushed_commits(
2633 this: Entity<Self>,
2634 envelope: TypedEnvelope<proto::CheckForPushedCommits>,
2635 mut cx: AsyncApp,
2636 ) -> Result<proto::CheckForPushedCommitsResponse> {
2637 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2638 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2639
2640 let branches = repository_handle
2641 .update(&mut cx, |repository_handle, _| {
2642 repository_handle.check_for_pushed_commits()
2643 })
2644 .await??;
2645 Ok(proto::CheckForPushedCommitsResponse {
2646 pushed_to: branches
2647 .into_iter()
2648 .map(|commit| commit.to_string())
2649 .collect(),
2650 })
2651 }
2652
2653 async fn handle_git_diff(
2654 this: Entity<Self>,
2655 envelope: TypedEnvelope<proto::GitDiff>,
2656 mut cx: AsyncApp,
2657 ) -> Result<proto::GitDiffResponse> {
2658 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2659 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2660 let diff_type = match envelope.payload.diff_type() {
2661 proto::git_diff::DiffType::HeadToIndex => DiffType::HeadToIndex,
2662 proto::git_diff::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
2663 proto::git_diff::DiffType::MergeBase => {
2664 let base_ref = envelope
2665 .payload
2666 .merge_base_ref
2667 .ok_or_else(|| anyhow!("merge_base_ref is required for MergeBase diff type"))?;
2668 DiffType::MergeBase {
2669 base_ref: base_ref.into(),
2670 }
2671 }
2672 };
2673
2674 let mut diff = repository_handle
2675 .update(&mut cx, |repository_handle, cx| {
2676 repository_handle.diff(diff_type, cx)
2677 })
2678 .await??;
2679 const ONE_MB: usize = 1_000_000;
2680 if diff.len() > ONE_MB {
2681 diff = diff.chars().take(ONE_MB).collect()
2682 }
2683
2684 Ok(proto::GitDiffResponse { diff })
2685 }
2686
2687 async fn handle_tree_diff(
2688 this: Entity<Self>,
2689 request: TypedEnvelope<proto::GetTreeDiff>,
2690 mut cx: AsyncApp,
2691 ) -> Result<proto::GetTreeDiffResponse> {
2692 let repository_id = RepositoryId(request.payload.repository_id);
2693 let diff_type = if request.payload.is_merge {
2694 DiffTreeType::MergeBase {
2695 base: request.payload.base.into(),
2696 head: request.payload.head.into(),
2697 }
2698 } else {
2699 DiffTreeType::Since {
2700 base: request.payload.base.into(),
2701 head: request.payload.head.into(),
2702 }
2703 };
2704
2705 let diff = this
2706 .update(&mut cx, |this, cx| {
2707 let repository = this.repositories().get(&repository_id)?;
2708 Some(repository.update(cx, |repo, cx| repo.diff_tree(diff_type, cx)))
2709 })
2710 .context("missing repository")?
2711 .await??;
2712
2713 Ok(proto::GetTreeDiffResponse {
2714 entries: diff
2715 .entries
2716 .into_iter()
2717 .map(|(path, status)| proto::TreeDiffStatus {
2718 path: path.as_ref().to_proto(),
2719 status: match status {
2720 TreeDiffStatus::Added {} => proto::tree_diff_status::Status::Added.into(),
2721 TreeDiffStatus::Modified { .. } => {
2722 proto::tree_diff_status::Status::Modified.into()
2723 }
2724 TreeDiffStatus::Deleted { .. } => {
2725 proto::tree_diff_status::Status::Deleted.into()
2726 }
2727 },
2728 oid: match status {
2729 TreeDiffStatus::Deleted { old } | TreeDiffStatus::Modified { old } => {
2730 Some(old.to_string())
2731 }
2732 TreeDiffStatus::Added => None,
2733 },
2734 })
2735 .collect(),
2736 })
2737 }
2738
2739 async fn handle_get_blob_content(
2740 this: Entity<Self>,
2741 request: TypedEnvelope<proto::GetBlobContent>,
2742 mut cx: AsyncApp,
2743 ) -> Result<proto::GetBlobContentResponse> {
2744 let oid = git::Oid::from_str(&request.payload.oid)?;
2745 let repository_id = RepositoryId(request.payload.repository_id);
2746 let content = this
2747 .update(&mut cx, |this, cx| {
2748 let repository = this.repositories().get(&repository_id)?;
2749 Some(repository.update(cx, |repo, cx| repo.load_blob_content(oid, cx)))
2750 })
2751 .context("missing repository")?
2752 .await?;
2753 Ok(proto::GetBlobContentResponse { content })
2754 }
2755
2756 async fn handle_open_unstaged_diff(
2757 this: Entity<Self>,
2758 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2759 mut cx: AsyncApp,
2760 ) -> Result<proto::OpenUnstagedDiffResponse> {
2761 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2762 let diff = this
2763 .update(&mut cx, |this, cx| {
2764 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2765 Some(this.open_unstaged_diff(buffer, cx))
2766 })
2767 .context("missing buffer")?
2768 .await?;
2769 this.update(&mut cx, |this, _| {
2770 let shared_diffs = this
2771 .shared_diffs
2772 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2773 .or_default();
2774 shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone());
2775 });
2776 let staged_text = diff.read_with(&cx, |diff, cx| diff.base_text_string(cx));
2777 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2778 }
2779
2780 async fn handle_open_uncommitted_diff(
2781 this: Entity<Self>,
2782 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2783 mut cx: AsyncApp,
2784 ) -> Result<proto::OpenUncommittedDiffResponse> {
2785 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2786 let diff = this
2787 .update(&mut cx, |this, cx| {
2788 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2789 Some(this.open_uncommitted_diff(buffer, cx))
2790 })
2791 .context("missing buffer")?
2792 .await?;
2793 this.update(&mut cx, |this, _| {
2794 let shared_diffs = this
2795 .shared_diffs
2796 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2797 .or_default();
2798 shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone());
2799 });
2800 Ok(diff.read_with(&cx, |diff, cx| {
2801 use proto::open_uncommitted_diff_response::Mode;
2802
2803 let unstaged_diff = diff.secondary_diff();
2804 let index_snapshot = unstaged_diff.and_then(|diff| {
2805 let diff = diff.read(cx);
2806 diff.base_text_exists().then(|| diff.base_text(cx))
2807 });
2808
2809 let mode;
2810 let staged_text;
2811 let committed_text;
2812 if diff.base_text_exists() {
2813 let committed_snapshot = diff.base_text(cx);
2814 committed_text = Some(committed_snapshot.text());
2815 if let Some(index_text) = index_snapshot {
2816 if index_text.remote_id() == committed_snapshot.remote_id() {
2817 mode = Mode::IndexMatchesHead;
2818 staged_text = None;
2819 } else {
2820 mode = Mode::IndexAndHead;
2821 staged_text = Some(index_text.text());
2822 }
2823 } else {
2824 mode = Mode::IndexAndHead;
2825 staged_text = None;
2826 }
2827 } else {
2828 mode = Mode::IndexAndHead;
2829 committed_text = None;
2830 staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2831 }
2832
2833 proto::OpenUncommittedDiffResponse {
2834 committed_text,
2835 staged_text,
2836 mode: mode.into(),
2837 }
2838 }))
2839 }
2840
2841 async fn handle_update_diff_bases(
2842 this: Entity<Self>,
2843 request: TypedEnvelope<proto::UpdateDiffBases>,
2844 mut cx: AsyncApp,
2845 ) -> Result<()> {
2846 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2847 this.update(&mut cx, |this, cx| {
2848 if let Some(diff_state) = this.diffs.get_mut(&buffer_id)
2849 && let Some(buffer) = this.buffer_store.read(cx).get(buffer_id)
2850 {
2851 let buffer = buffer.read(cx).text_snapshot();
2852 diff_state.update(cx, |diff_state, cx| {
2853 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2854 })
2855 }
2856 });
2857 Ok(())
2858 }
2859
2860 async fn handle_blame_buffer(
2861 this: Entity<Self>,
2862 envelope: TypedEnvelope<proto::BlameBuffer>,
2863 mut cx: AsyncApp,
2864 ) -> Result<proto::BlameBufferResponse> {
2865 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2866 let version = deserialize_version(&envelope.payload.version);
2867 let buffer = this.read_with(&cx, |this, cx| {
2868 this.buffer_store.read(cx).get_existing(buffer_id)
2869 })?;
2870 buffer
2871 .update(&mut cx, |buffer, _| {
2872 buffer.wait_for_version(version.clone())
2873 })
2874 .await?;
2875 let blame = this
2876 .update(&mut cx, |this, cx| {
2877 this.blame_buffer(&buffer, Some(version), cx)
2878 })
2879 .await?;
2880 Ok(serialize_blame_buffer_response(blame))
2881 }
2882
2883 async fn handle_get_permalink_to_line(
2884 this: Entity<Self>,
2885 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2886 mut cx: AsyncApp,
2887 ) -> Result<proto::GetPermalinkToLineResponse> {
2888 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2889 // let version = deserialize_version(&envelope.payload.version);
2890 let selection = {
2891 let proto_selection = envelope
2892 .payload
2893 .selection
2894 .context("no selection to get permalink for defined")?;
2895 proto_selection.start as u32..proto_selection.end as u32
2896 };
2897 let buffer = this.read_with(&cx, |this, cx| {
2898 this.buffer_store.read(cx).get_existing(buffer_id)
2899 })?;
2900 let permalink = this
2901 .update(&mut cx, |this, cx| {
2902 this.get_permalink_to_line(&buffer, selection, cx)
2903 })
2904 .await?;
2905 Ok(proto::GetPermalinkToLineResponse {
2906 permalink: permalink.to_string(),
2907 })
2908 }
2909
2910 fn repository_for_request(
2911 this: &Entity<Self>,
2912 id: RepositoryId,
2913 cx: &mut AsyncApp,
2914 ) -> Result<Entity<Repository>> {
2915 this.read_with(cx, |this, _| {
2916 this.repositories
2917 .get(&id)
2918 .context("missing repository handle")
2919 .cloned()
2920 })
2921 }
2922
2923 pub fn repo_snapshots(&self, cx: &App) -> HashMap<RepositoryId, RepositorySnapshot> {
2924 self.repositories
2925 .iter()
2926 .map(|(id, repo)| (*id, repo.read(cx).snapshot.clone()))
2927 .collect()
2928 }
2929
2930 fn process_updated_entries(
2931 &self,
2932 worktree: &Entity<Worktree>,
2933 updated_entries: &[(Arc<RelPath>, ProjectEntryId, PathChange)],
2934 cx: &mut App,
2935 ) -> Task<HashMap<Entity<Repository>, Vec<RepoPath>>> {
2936 let path_style = worktree.read(cx).path_style();
2937 let mut repo_paths = self
2938 .repositories
2939 .values()
2940 .map(|repo| (repo.read(cx).work_directory_abs_path.clone(), repo.clone()))
2941 .collect::<Vec<_>>();
2942 let mut entries: Vec<_> = updated_entries
2943 .iter()
2944 .map(|(path, _, _)| path.clone())
2945 .collect();
2946 entries.sort();
2947 let worktree = worktree.read(cx);
2948
2949 let entries = entries
2950 .into_iter()
2951 .map(|path| worktree.absolutize(&path))
2952 .collect::<Arc<[_]>>();
2953
2954 let executor = cx.background_executor().clone();
2955 cx.background_executor().spawn(async move {
2956 repo_paths.sort_by(|lhs, rhs| lhs.0.cmp(&rhs.0));
2957 let mut paths_by_git_repo = HashMap::<_, Vec<_>>::default();
2958 let mut tasks = FuturesOrdered::new();
2959 for (repo_path, repo) in repo_paths.into_iter().rev() {
2960 let entries = entries.clone();
2961 let task = executor.spawn(async move {
2962 // Find all repository paths that belong to this repo
2963 let mut ix = entries.partition_point(|path| path < &*repo_path);
2964 if ix == entries.len() {
2965 return None;
2966 };
2967
2968 let mut paths = Vec::new();
2969 // All paths prefixed by a given repo will constitute a continuous range.
2970 while let Some(path) = entries.get(ix)
2971 && let Some(repo_path) = RepositorySnapshot::abs_path_to_repo_path_inner(
2972 &repo_path, path, path_style,
2973 )
2974 {
2975 paths.push((repo_path, ix));
2976 ix += 1;
2977 }
2978 if paths.is_empty() {
2979 None
2980 } else {
2981 Some((repo, paths))
2982 }
2983 });
2984 tasks.push_back(task);
2985 }
2986
2987 // Now, let's filter out the "duplicate" entries that were processed by multiple distinct repos.
2988 let mut path_was_used = vec![false; entries.len()];
2989 let tasks = tasks.collect::<Vec<_>>().await;
2990 // Process tasks from the back: iterating backwards allows us to see more-specific paths first.
2991 // We always want to assign a path to it's innermost repository.
2992 for t in tasks {
2993 let Some((repo, paths)) = t else {
2994 continue;
2995 };
2996 let entry = paths_by_git_repo.entry(repo).or_default();
2997 for (repo_path, ix) in paths {
2998 if path_was_used[ix] {
2999 continue;
3000 }
3001 path_was_used[ix] = true;
3002 entry.push(repo_path);
3003 }
3004 }
3005
3006 paths_by_git_repo
3007 })
3008 }
3009}
3010
3011impl BufferGitState {
3012 fn new(_git_store: WeakEntity<GitStore>) -> Self {
3013 Self {
3014 unstaged_diff: Default::default(),
3015 uncommitted_diff: Default::default(),
3016 oid_diffs: Default::default(),
3017 recalculate_diff_task: Default::default(),
3018 language: Default::default(),
3019 language_registry: Default::default(),
3020 recalculating_tx: postage::watch::channel_with(false).0,
3021 hunk_staging_operation_count: 0,
3022 hunk_staging_operation_count_as_of_write: 0,
3023 head_text: Default::default(),
3024 index_text: Default::default(),
3025 oid_texts: Default::default(),
3026 head_changed: Default::default(),
3027 index_changed: Default::default(),
3028 language_changed: Default::default(),
3029 conflict_updated_futures: Default::default(),
3030 conflict_set: Default::default(),
3031 reparse_conflict_markers_task: Default::default(),
3032 }
3033 }
3034
3035 #[ztracing::instrument(skip_all)]
3036 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
3037 self.language = buffer.read(cx).language().cloned();
3038 self.language_changed = true;
3039 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
3040 }
3041
3042 fn reparse_conflict_markers(
3043 &mut self,
3044 buffer: text::BufferSnapshot,
3045 cx: &mut Context<Self>,
3046 ) -> oneshot::Receiver<()> {
3047 let (tx, rx) = oneshot::channel();
3048
3049 let Some(conflict_set) = self
3050 .conflict_set
3051 .as_ref()
3052 .and_then(|conflict_set| conflict_set.upgrade())
3053 else {
3054 return rx;
3055 };
3056
3057 let old_snapshot = conflict_set.read_with(cx, |conflict_set, _| {
3058 if conflict_set.has_conflict {
3059 Some(conflict_set.snapshot())
3060 } else {
3061 None
3062 }
3063 });
3064
3065 if let Some(old_snapshot) = old_snapshot {
3066 self.conflict_updated_futures.push(tx);
3067 self.reparse_conflict_markers_task = Some(cx.spawn(async move |this, cx| {
3068 let (snapshot, changed_range) = cx
3069 .background_spawn(async move {
3070 let new_snapshot = ConflictSet::parse(&buffer);
3071 let changed_range = old_snapshot.compare(&new_snapshot, &buffer);
3072 (new_snapshot, changed_range)
3073 })
3074 .await;
3075 this.update(cx, |this, cx| {
3076 if let Some(conflict_set) = &this.conflict_set {
3077 conflict_set
3078 .update(cx, |conflict_set, cx| {
3079 conflict_set.set_snapshot(snapshot, changed_range, cx);
3080 })
3081 .ok();
3082 }
3083 let futures = std::mem::take(&mut this.conflict_updated_futures);
3084 for tx in futures {
3085 tx.send(()).ok();
3086 }
3087 })
3088 }))
3089 }
3090
3091 rx
3092 }
3093
3094 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
3095 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
3096 }
3097
3098 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
3099 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
3100 }
3101
3102 fn oid_diff(&self, oid: Option<git::Oid>) -> Option<Entity<BufferDiff>> {
3103 self.oid_diffs.get(&oid).and_then(|weak| weak.upgrade())
3104 }
3105
3106 fn handle_base_texts_updated(
3107 &mut self,
3108 buffer: text::BufferSnapshot,
3109 message: proto::UpdateDiffBases,
3110 cx: &mut Context<Self>,
3111 ) {
3112 use proto::update_diff_bases::Mode;
3113
3114 let Some(mode) = Mode::from_i32(message.mode) else {
3115 return;
3116 };
3117
3118 let diff_bases_change = match mode {
3119 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
3120 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
3121 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
3122 Mode::IndexAndHead => DiffBasesChange::SetEach {
3123 index: message.staged_text,
3124 head: message.committed_text,
3125 },
3126 };
3127
3128 self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
3129 }
3130
3131 pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
3132 if *self.recalculating_tx.borrow() {
3133 let mut rx = self.recalculating_tx.subscribe();
3134 Some(async move {
3135 loop {
3136 let is_recalculating = rx.recv().await;
3137 if is_recalculating != Some(true) {
3138 break;
3139 }
3140 }
3141 })
3142 } else {
3143 None
3144 }
3145 }
3146
3147 fn diff_bases_changed(
3148 &mut self,
3149 buffer: text::BufferSnapshot,
3150 diff_bases_change: Option<DiffBasesChange>,
3151 cx: &mut Context<Self>,
3152 ) {
3153 match diff_bases_change {
3154 Some(DiffBasesChange::SetIndex(index)) => {
3155 self.index_text = index.map(|mut index| {
3156 text::LineEnding::normalize(&mut index);
3157 Arc::from(index.as_str())
3158 });
3159 self.index_changed = true;
3160 }
3161 Some(DiffBasesChange::SetHead(head)) => {
3162 self.head_text = head.map(|mut head| {
3163 text::LineEnding::normalize(&mut head);
3164 Arc::from(head.as_str())
3165 });
3166 self.head_changed = true;
3167 }
3168 Some(DiffBasesChange::SetBoth(text)) => {
3169 let text = text.map(|mut text| {
3170 text::LineEnding::normalize(&mut text);
3171 Arc::from(text.as_str())
3172 });
3173 self.head_text = text.clone();
3174 self.index_text = text;
3175 self.head_changed = true;
3176 self.index_changed = true;
3177 }
3178 Some(DiffBasesChange::SetEach { index, head }) => {
3179 self.index_text = index.map(|mut index| {
3180 text::LineEnding::normalize(&mut index);
3181 Arc::from(index.as_str())
3182 });
3183 self.index_changed = true;
3184 self.head_text = head.map(|mut head| {
3185 text::LineEnding::normalize(&mut head);
3186 Arc::from(head.as_str())
3187 });
3188 self.head_changed = true;
3189 }
3190 None => {}
3191 }
3192
3193 self.recalculate_diffs(buffer, cx)
3194 }
3195
3196 #[ztracing::instrument(skip_all)]
3197 fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
3198 *self.recalculating_tx.borrow_mut() = true;
3199
3200 let language = self.language.clone();
3201 let language_registry = self.language_registry.clone();
3202 let unstaged_diff = self.unstaged_diff();
3203 let uncommitted_diff = self.uncommitted_diff();
3204 let head = self.head_text.clone();
3205 let index = self.index_text.clone();
3206 let index_changed = self.index_changed;
3207 let head_changed = self.head_changed;
3208 let language_changed = self.language_changed;
3209 let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_write;
3210 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
3211 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
3212 (None, None) => true,
3213 _ => false,
3214 };
3215
3216 let oid_diffs: Vec<(Option<git::Oid>, Entity<BufferDiff>, Option<Arc<str>>)> = self
3217 .oid_diffs
3218 .iter()
3219 .filter_map(|(oid, weak)| {
3220 let base_text = oid.and_then(|oid| self.oid_texts.get(&oid).cloned());
3221 weak.upgrade().map(|diff| (*oid, diff, base_text))
3222 })
3223 .collect();
3224
3225 self.oid_diffs.retain(|oid, weak| {
3226 let alive = weak.upgrade().is_some();
3227 if !alive {
3228 if let Some(oid) = oid {
3229 self.oid_texts.remove(oid);
3230 }
3231 }
3232 alive
3233 });
3234 self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
3235 log::debug!(
3236 "start recalculating diffs for buffer {}",
3237 buffer.remote_id()
3238 );
3239
3240 let mut new_unstaged_diff = None;
3241 if let Some(unstaged_diff) = &unstaged_diff {
3242 new_unstaged_diff = Some(
3243 cx.update(|cx| {
3244 unstaged_diff.read(cx).update_diff(
3245 buffer.clone(),
3246 index,
3247 index_changed.then_some(false),
3248 language.clone(),
3249 cx,
3250 )
3251 })
3252 .await,
3253 );
3254 }
3255
3256 // Dropping BufferDiff can be expensive, so yield back to the event loop
3257 // for a bit
3258 yield_now().await;
3259
3260 let mut new_uncommitted_diff = None;
3261 if let Some(uncommitted_diff) = &uncommitted_diff {
3262 new_uncommitted_diff = if index_matches_head {
3263 new_unstaged_diff.clone()
3264 } else {
3265 Some(
3266 cx.update(|cx| {
3267 uncommitted_diff.read(cx).update_diff(
3268 buffer.clone(),
3269 head,
3270 head_changed.then_some(true),
3271 language.clone(),
3272 cx,
3273 )
3274 })
3275 .await,
3276 )
3277 }
3278 }
3279
3280 // Dropping BufferDiff can be expensive, so yield back to the event loop
3281 // for a bit
3282 yield_now().await;
3283
3284 let cancel = this.update(cx, |this, _| {
3285 // This checks whether all pending stage/unstage operations
3286 // have quiesced (i.e. both the corresponding write and the
3287 // read of that write have completed). If not, then we cancel
3288 // this recalculation attempt to avoid invalidating pending
3289 // state too quickly; another recalculation will come along
3290 // later and clear the pending state once the state of the index has settled.
3291 if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
3292 *this.recalculating_tx.borrow_mut() = false;
3293 true
3294 } else {
3295 false
3296 }
3297 })?;
3298 if cancel {
3299 log::debug!(
3300 concat!(
3301 "aborting recalculating diffs for buffer {}",
3302 "due to subsequent hunk operations",
3303 ),
3304 buffer.remote_id()
3305 );
3306 return Ok(());
3307 }
3308
3309 let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
3310 unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
3311 {
3312 let task = unstaged_diff.update(cx, |diff, cx| {
3313 // For git index buffer we skip assigning the language as we do not really need to perform any syntax highlighting on
3314 // it. As a result, by skipping it we are potentially shaving off a lot of RSS plus we get a snappier feel for large diff
3315 // view multibuffers.
3316 diff.set_snapshot(new_unstaged_diff, &buffer, cx)
3317 });
3318 Some(task.await)
3319 } else {
3320 None
3321 };
3322
3323 yield_now().await;
3324
3325 if let Some((uncommitted_diff, new_uncommitted_diff)) =
3326 uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
3327 {
3328 uncommitted_diff
3329 .update(cx, |diff, cx| {
3330 if language_changed {
3331 diff.language_changed(language.clone(), language_registry, cx);
3332 }
3333 diff.set_snapshot_with_secondary(
3334 new_uncommitted_diff,
3335 &buffer,
3336 unstaged_changed_range.flatten(),
3337 true,
3338 cx,
3339 )
3340 })
3341 .await;
3342 }
3343
3344 yield_now().await;
3345
3346 for (oid, oid_diff, base_text) in oid_diffs {
3347 let new_oid_diff = cx
3348 .update(|cx| {
3349 oid_diff.read(cx).update_diff(
3350 buffer.clone(),
3351 base_text,
3352 None,
3353 language.clone(),
3354 cx,
3355 )
3356 })
3357 .await;
3358
3359 oid_diff
3360 .update(cx, |diff, cx| diff.set_snapshot(new_oid_diff, &buffer, cx))
3361 .await;
3362
3363 log::debug!(
3364 "finished recalculating oid diff for buffer {} oid {:?}",
3365 buffer.remote_id(),
3366 oid
3367 );
3368
3369 yield_now().await;
3370 }
3371
3372 log::debug!(
3373 "finished recalculating diffs for buffer {}",
3374 buffer.remote_id()
3375 );
3376
3377 if let Some(this) = this.upgrade() {
3378 this.update(cx, |this, _| {
3379 this.index_changed = false;
3380 this.head_changed = false;
3381 this.language_changed = false;
3382 *this.recalculating_tx.borrow_mut() = false;
3383 });
3384 }
3385
3386 Ok(())
3387 }));
3388 }
3389}
3390
3391fn make_remote_delegate(
3392 this: Entity<GitStore>,
3393 project_id: u64,
3394 repository_id: RepositoryId,
3395 askpass_id: u64,
3396 cx: &mut AsyncApp,
3397) -> AskPassDelegate {
3398 AskPassDelegate::new(cx, move |prompt, tx, cx| {
3399 this.update(cx, |this, cx| {
3400 let Some((client, _)) = this.downstream_client() else {
3401 return;
3402 };
3403 let response = client.request(proto::AskPassRequest {
3404 project_id,
3405 repository_id: repository_id.to_proto(),
3406 askpass_id,
3407 prompt,
3408 });
3409 cx.spawn(async move |_, _| {
3410 let mut response = response.await?.response;
3411 tx.send(EncryptedPassword::try_from(response.as_ref())?)
3412 .ok();
3413 response.zeroize();
3414 anyhow::Ok(())
3415 })
3416 .detach_and_log_err(cx);
3417 });
3418 })
3419}
3420
3421impl RepositoryId {
3422 pub fn to_proto(self) -> u64 {
3423 self.0
3424 }
3425
3426 pub fn from_proto(id: u64) -> Self {
3427 RepositoryId(id)
3428 }
3429}
3430
3431impl RepositorySnapshot {
3432 fn empty(id: RepositoryId, work_directory_abs_path: Arc<Path>, path_style: PathStyle) -> Self {
3433 Self {
3434 id,
3435 statuses_by_path: Default::default(),
3436 work_directory_abs_path,
3437 branch: None,
3438 head_commit: None,
3439 scan_id: 0,
3440 merge: Default::default(),
3441 remote_origin_url: None,
3442 remote_upstream_url: None,
3443 stash_entries: Default::default(),
3444 path_style,
3445 }
3446 }
3447
3448 fn initial_update(&self, project_id: u64) -> proto::UpdateRepository {
3449 proto::UpdateRepository {
3450 branch_summary: self.branch.as_ref().map(branch_to_proto),
3451 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
3452 updated_statuses: self
3453 .statuses_by_path
3454 .iter()
3455 .map(|entry| entry.to_proto())
3456 .collect(),
3457 removed_statuses: Default::default(),
3458 current_merge_conflicts: self
3459 .merge
3460 .conflicted_paths
3461 .iter()
3462 .map(|repo_path| repo_path.to_proto())
3463 .collect(),
3464 merge_message: self.merge.message.as_ref().map(|msg| msg.to_string()),
3465 project_id,
3466 id: self.id.to_proto(),
3467 abs_path: self.work_directory_abs_path.to_string_lossy().into_owned(),
3468 entry_ids: vec![self.id.to_proto()],
3469 scan_id: self.scan_id,
3470 is_last_update: true,
3471 stash_entries: self
3472 .stash_entries
3473 .entries
3474 .iter()
3475 .map(stash_to_proto)
3476 .collect(),
3477 remote_upstream_url: self.remote_upstream_url.clone(),
3478 remote_origin_url: self.remote_origin_url.clone(),
3479 }
3480 }
3481
3482 fn build_update(&self, old: &Self, project_id: u64) -> proto::UpdateRepository {
3483 let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
3484 let mut removed_statuses: Vec<String> = Vec::new();
3485
3486 let mut new_statuses = self.statuses_by_path.iter().peekable();
3487 let mut old_statuses = old.statuses_by_path.iter().peekable();
3488
3489 let mut current_new_entry = new_statuses.next();
3490 let mut current_old_entry = old_statuses.next();
3491 loop {
3492 match (current_new_entry, current_old_entry) {
3493 (Some(new_entry), Some(old_entry)) => {
3494 match new_entry.repo_path.cmp(&old_entry.repo_path) {
3495 Ordering::Less => {
3496 updated_statuses.push(new_entry.to_proto());
3497 current_new_entry = new_statuses.next();
3498 }
3499 Ordering::Equal => {
3500 if new_entry.status != old_entry.status {
3501 updated_statuses.push(new_entry.to_proto());
3502 }
3503 current_old_entry = old_statuses.next();
3504 current_new_entry = new_statuses.next();
3505 }
3506 Ordering::Greater => {
3507 removed_statuses.push(old_entry.repo_path.to_proto());
3508 current_old_entry = old_statuses.next();
3509 }
3510 }
3511 }
3512 (None, Some(old_entry)) => {
3513 removed_statuses.push(old_entry.repo_path.to_proto());
3514 current_old_entry = old_statuses.next();
3515 }
3516 (Some(new_entry), None) => {
3517 updated_statuses.push(new_entry.to_proto());
3518 current_new_entry = new_statuses.next();
3519 }
3520 (None, None) => break,
3521 }
3522 }
3523
3524 proto::UpdateRepository {
3525 branch_summary: self.branch.as_ref().map(branch_to_proto),
3526 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
3527 updated_statuses,
3528 removed_statuses,
3529 current_merge_conflicts: self
3530 .merge
3531 .conflicted_paths
3532 .iter()
3533 .map(|path| path.to_proto())
3534 .collect(),
3535 merge_message: self.merge.message.as_ref().map(|msg| msg.to_string()),
3536 project_id,
3537 id: self.id.to_proto(),
3538 abs_path: self.work_directory_abs_path.to_string_lossy().into_owned(),
3539 entry_ids: vec![],
3540 scan_id: self.scan_id,
3541 is_last_update: true,
3542 stash_entries: self
3543 .stash_entries
3544 .entries
3545 .iter()
3546 .map(stash_to_proto)
3547 .collect(),
3548 remote_upstream_url: self.remote_upstream_url.clone(),
3549 remote_origin_url: self.remote_origin_url.clone(),
3550 }
3551 }
3552
3553 pub fn status(&self) -> impl Iterator<Item = StatusEntry> + '_ {
3554 self.statuses_by_path.iter().cloned()
3555 }
3556
3557 pub fn status_summary(&self) -> GitSummary {
3558 self.statuses_by_path.summary().item_summary
3559 }
3560
3561 pub fn status_for_path(&self, path: &RepoPath) -> Option<StatusEntry> {
3562 self.statuses_by_path
3563 .get(&PathKey(path.as_ref().clone()), ())
3564 .cloned()
3565 }
3566
3567 pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
3568 Self::abs_path_to_repo_path_inner(&self.work_directory_abs_path, abs_path, self.path_style)
3569 }
3570
3571 fn repo_path_to_abs_path(&self, repo_path: &RepoPath) -> PathBuf {
3572 self.path_style
3573 .join(&self.work_directory_abs_path, repo_path.as_std_path())
3574 .unwrap()
3575 .into()
3576 }
3577
3578 #[inline]
3579 fn abs_path_to_repo_path_inner(
3580 work_directory_abs_path: &Path,
3581 abs_path: &Path,
3582 path_style: PathStyle,
3583 ) -> Option<RepoPath> {
3584 let rel_path = path_style.strip_prefix(abs_path, work_directory_abs_path)?;
3585 Some(RepoPath::from_rel_path(&rel_path))
3586 }
3587
3588 pub fn had_conflict_on_last_merge_head_change(&self, repo_path: &RepoPath) -> bool {
3589 self.merge.conflicted_paths.contains(repo_path)
3590 }
3591
3592 pub fn has_conflict(&self, repo_path: &RepoPath) -> bool {
3593 let had_conflict_on_last_merge_head_change =
3594 self.merge.conflicted_paths.contains(repo_path);
3595 let has_conflict_currently = self
3596 .status_for_path(repo_path)
3597 .is_some_and(|entry| entry.status.is_conflicted());
3598 had_conflict_on_last_merge_head_change || has_conflict_currently
3599 }
3600
3601 /// This is the name that will be displayed in the repository selector for this repository.
3602 pub fn display_name(&self) -> SharedString {
3603 self.work_directory_abs_path
3604 .file_name()
3605 .unwrap_or_default()
3606 .to_string_lossy()
3607 .to_string()
3608 .into()
3609 }
3610}
3611
3612pub fn stash_to_proto(entry: &StashEntry) -> proto::StashEntry {
3613 proto::StashEntry {
3614 oid: entry.oid.as_bytes().to_vec(),
3615 message: entry.message.clone(),
3616 branch: entry.branch.clone(),
3617 index: entry.index as u64,
3618 timestamp: entry.timestamp,
3619 }
3620}
3621
3622pub fn proto_to_stash(entry: &proto::StashEntry) -> Result<StashEntry> {
3623 Ok(StashEntry {
3624 oid: Oid::from_bytes(&entry.oid)?,
3625 message: entry.message.clone(),
3626 index: entry.index as usize,
3627 branch: entry.branch.clone(),
3628 timestamp: entry.timestamp,
3629 })
3630}
3631
3632impl MergeDetails {
3633 async fn load(
3634 backend: &Arc<dyn GitRepository>,
3635 status: &SumTree<StatusEntry>,
3636 prev_snapshot: &RepositorySnapshot,
3637 ) -> Result<(MergeDetails, bool)> {
3638 log::debug!("load merge details");
3639 let message = backend.merge_message().await;
3640 let heads = backend
3641 .revparse_batch(vec![
3642 "MERGE_HEAD".into(),
3643 "CHERRY_PICK_HEAD".into(),
3644 "REBASE_HEAD".into(),
3645 "REVERT_HEAD".into(),
3646 "APPLY_HEAD".into(),
3647 ])
3648 .await
3649 .log_err()
3650 .unwrap_or_default()
3651 .into_iter()
3652 .map(|opt| opt.map(SharedString::from))
3653 .collect::<Vec<_>>();
3654 let merge_heads_changed = heads != prev_snapshot.merge.heads;
3655 let conflicted_paths = if merge_heads_changed {
3656 let current_conflicted_paths = TreeSet::from_ordered_entries(
3657 status
3658 .iter()
3659 .filter(|entry| entry.status.is_conflicted())
3660 .map(|entry| entry.repo_path.clone()),
3661 );
3662
3663 // It can happen that we run a scan while a lengthy merge is in progress
3664 // that will eventually result in conflicts, but before those conflicts
3665 // are reported by `git status`. Since for the moment we only care about
3666 // the merge heads state for the purposes of tracking conflicts, don't update
3667 // this state until we see some conflicts.
3668 if heads.iter().any(Option::is_some)
3669 && !prev_snapshot.merge.heads.iter().any(Option::is_some)
3670 && current_conflicted_paths.is_empty()
3671 {
3672 log::debug!("not updating merge heads because no conflicts found");
3673 return Ok((
3674 MergeDetails {
3675 message: message.map(SharedString::from),
3676 ..prev_snapshot.merge.clone()
3677 },
3678 false,
3679 ));
3680 }
3681
3682 current_conflicted_paths
3683 } else {
3684 prev_snapshot.merge.conflicted_paths.clone()
3685 };
3686 let details = MergeDetails {
3687 conflicted_paths,
3688 message: message.map(SharedString::from),
3689 heads,
3690 };
3691 Ok((details, merge_heads_changed))
3692 }
3693}
3694
3695impl Repository {
3696 pub fn snapshot(&self) -> RepositorySnapshot {
3697 self.snapshot.clone()
3698 }
3699
3700 pub fn pending_ops(&self) -> impl Iterator<Item = PendingOps> + '_ {
3701 self.pending_ops.iter().cloned()
3702 }
3703
3704 pub fn pending_ops_summary(&self) -> PathSummary<PendingOpsSummary> {
3705 self.pending_ops.summary().clone()
3706 }
3707
3708 pub fn pending_ops_for_path(&self, path: &RepoPath) -> Option<PendingOps> {
3709 self.pending_ops
3710 .get(&PathKey(path.as_ref().clone()), ())
3711 .cloned()
3712 }
3713
3714 fn local(
3715 id: RepositoryId,
3716 work_directory_abs_path: Arc<Path>,
3717 dot_git_abs_path: Arc<Path>,
3718 project_environment: WeakEntity<ProjectEnvironment>,
3719 fs: Arc<dyn Fs>,
3720 git_store: WeakEntity<GitStore>,
3721 cx: &mut Context<Self>,
3722 ) -> Self {
3723 let snapshot =
3724 RepositorySnapshot::empty(id, work_directory_abs_path.clone(), PathStyle::local());
3725 let state = cx
3726 .spawn(async move |_, cx| {
3727 LocalRepositoryState::new(
3728 work_directory_abs_path,
3729 dot_git_abs_path,
3730 project_environment,
3731 fs,
3732 cx,
3733 )
3734 .await
3735 .map_err(|err| err.to_string())
3736 })
3737 .shared();
3738 let job_sender = Repository::spawn_local_git_worker(state.clone(), cx);
3739 let state = cx
3740 .spawn(async move |_, _| {
3741 let state = state.await?;
3742 Ok(RepositoryState::Local(state))
3743 })
3744 .shared();
3745
3746 cx.subscribe_self(move |this, event: &RepositoryEvent, _| match event {
3747 RepositoryEvent::BranchChanged | RepositoryEvent::MergeHeadsChanged => {
3748 if this.scan_id > 1 {
3749 this.initial_graph_data.clear();
3750 }
3751 }
3752 _ => {}
3753 })
3754 .detach();
3755
3756 Repository {
3757 this: cx.weak_entity(),
3758 git_store,
3759 snapshot,
3760 pending_ops: Default::default(),
3761 repository_state: state,
3762 commit_message_buffer: None,
3763 askpass_delegates: Default::default(),
3764 paths_needing_status_update: Default::default(),
3765 latest_askpass_id: 0,
3766 job_sender,
3767 job_id: 0,
3768 active_jobs: Default::default(),
3769 initial_graph_data: Default::default(),
3770 commit_data: Default::default(),
3771 graph_commit_data_handler: GraphCommitHandlerState::Closed,
3772 }
3773 }
3774
3775 fn remote(
3776 id: RepositoryId,
3777 work_directory_abs_path: Arc<Path>,
3778 path_style: PathStyle,
3779 project_id: ProjectId,
3780 client: AnyProtoClient,
3781 git_store: WeakEntity<GitStore>,
3782 cx: &mut Context<Self>,
3783 ) -> Self {
3784 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path, path_style);
3785 let repository_state = RemoteRepositoryState { project_id, client };
3786 let job_sender = Self::spawn_remote_git_worker(repository_state.clone(), cx);
3787 let repository_state = Task::ready(Ok(RepositoryState::Remote(repository_state))).shared();
3788 Self {
3789 this: cx.weak_entity(),
3790 snapshot,
3791 commit_message_buffer: None,
3792 git_store,
3793 pending_ops: Default::default(),
3794 paths_needing_status_update: Default::default(),
3795 job_sender,
3796 repository_state,
3797 askpass_delegates: Default::default(),
3798 latest_askpass_id: 0,
3799 active_jobs: Default::default(),
3800 job_id: 0,
3801 initial_graph_data: Default::default(),
3802 commit_data: Default::default(),
3803 graph_commit_data_handler: GraphCommitHandlerState::Closed,
3804 }
3805 }
3806
3807 pub fn git_store(&self) -> Option<Entity<GitStore>> {
3808 self.git_store.upgrade()
3809 }
3810
3811 fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
3812 let this = cx.weak_entity();
3813 let git_store = self.git_store.clone();
3814 let _ = self.send_keyed_job(
3815 Some(GitJobKey::ReloadBufferDiffBases),
3816 None,
3817 |state, mut cx| async move {
3818 let RepositoryState::Local(LocalRepositoryState { backend, .. }) = state else {
3819 log::error!("tried to recompute diffs for a non-local repository");
3820 return Ok(());
3821 };
3822
3823 let Some(this) = this.upgrade() else {
3824 return Ok(());
3825 };
3826
3827 let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
3828 git_store.update(cx, |git_store, cx| {
3829 git_store
3830 .diffs
3831 .iter()
3832 .filter_map(|(buffer_id, diff_state)| {
3833 let buffer_store = git_store.buffer_store.read(cx);
3834 let buffer = buffer_store.get(*buffer_id)?;
3835 let file = File::from_dyn(buffer.read(cx).file())?;
3836 let abs_path = file.worktree.read(cx).absolutize(&file.path);
3837 let repo_path = this.abs_path_to_repo_path(&abs_path)?;
3838 log::debug!(
3839 "start reload diff bases for repo path {}",
3840 repo_path.as_unix_str()
3841 );
3842 diff_state.update(cx, |diff_state, _| {
3843 let has_unstaged_diff = diff_state
3844 .unstaged_diff
3845 .as_ref()
3846 .is_some_and(|diff| diff.is_upgradable());
3847 let has_uncommitted_diff = diff_state
3848 .uncommitted_diff
3849 .as_ref()
3850 .is_some_and(|set| set.is_upgradable());
3851
3852 Some((
3853 buffer,
3854 repo_path,
3855 has_unstaged_diff.then(|| diff_state.index_text.clone()),
3856 has_uncommitted_diff.then(|| diff_state.head_text.clone()),
3857 ))
3858 })
3859 })
3860 .collect::<Vec<_>>()
3861 })
3862 })?;
3863
3864 let buffer_diff_base_changes = cx
3865 .background_spawn(async move {
3866 let mut changes = Vec::new();
3867 for (buffer, repo_path, current_index_text, current_head_text) in
3868 &repo_diff_state_updates
3869 {
3870 let index_text = if current_index_text.is_some() {
3871 backend.load_index_text(repo_path.clone()).await
3872 } else {
3873 None
3874 };
3875 let head_text = if current_head_text.is_some() {
3876 backend.load_committed_text(repo_path.clone()).await
3877 } else {
3878 None
3879 };
3880
3881 let change =
3882 match (current_index_text.as_ref(), current_head_text.as_ref()) {
3883 (Some(current_index), Some(current_head)) => {
3884 let index_changed =
3885 index_text.as_deref() != current_index.as_deref();
3886 let head_changed =
3887 head_text.as_deref() != current_head.as_deref();
3888 if index_changed && head_changed {
3889 if index_text == head_text {
3890 Some(DiffBasesChange::SetBoth(head_text))
3891 } else {
3892 Some(DiffBasesChange::SetEach {
3893 index: index_text,
3894 head: head_text,
3895 })
3896 }
3897 } else if index_changed {
3898 Some(DiffBasesChange::SetIndex(index_text))
3899 } else if head_changed {
3900 Some(DiffBasesChange::SetHead(head_text))
3901 } else {
3902 None
3903 }
3904 }
3905 (Some(current_index), None) => {
3906 let index_changed =
3907 index_text.as_deref() != current_index.as_deref();
3908 index_changed
3909 .then_some(DiffBasesChange::SetIndex(index_text))
3910 }
3911 (None, Some(current_head)) => {
3912 let head_changed =
3913 head_text.as_deref() != current_head.as_deref();
3914 head_changed.then_some(DiffBasesChange::SetHead(head_text))
3915 }
3916 (None, None) => None,
3917 };
3918
3919 changes.push((buffer.clone(), change))
3920 }
3921 changes
3922 })
3923 .await;
3924
3925 git_store.update(&mut cx, |git_store, cx| {
3926 for (buffer, diff_bases_change) in buffer_diff_base_changes {
3927 let buffer_snapshot = buffer.read(cx).text_snapshot();
3928 let buffer_id = buffer_snapshot.remote_id();
3929 let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
3930 continue;
3931 };
3932
3933 let downstream_client = git_store.downstream_client();
3934 diff_state.update(cx, |diff_state, cx| {
3935 use proto::update_diff_bases::Mode;
3936
3937 if let Some((diff_bases_change, (client, project_id))) =
3938 diff_bases_change.clone().zip(downstream_client)
3939 {
3940 let (staged_text, committed_text, mode) = match diff_bases_change {
3941 DiffBasesChange::SetIndex(index) => {
3942 (index, None, Mode::IndexOnly)
3943 }
3944 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
3945 DiffBasesChange::SetEach { index, head } => {
3946 (index, head, Mode::IndexAndHead)
3947 }
3948 DiffBasesChange::SetBoth(text) => {
3949 (None, text, Mode::IndexMatchesHead)
3950 }
3951 };
3952 client
3953 .send(proto::UpdateDiffBases {
3954 project_id: project_id.to_proto(),
3955 buffer_id: buffer_id.to_proto(),
3956 staged_text,
3957 committed_text,
3958 mode: mode as i32,
3959 })
3960 .log_err();
3961 }
3962
3963 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
3964 });
3965 }
3966 })
3967 },
3968 );
3969 }
3970
3971 pub fn send_job<F, Fut, R>(
3972 &mut self,
3973 status: Option<SharedString>,
3974 job: F,
3975 ) -> oneshot::Receiver<R>
3976 where
3977 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3978 Fut: Future<Output = R> + 'static,
3979 R: Send + 'static,
3980 {
3981 self.send_keyed_job(None, status, job)
3982 }
3983
3984 fn send_keyed_job<F, Fut, R>(
3985 &mut self,
3986 key: Option<GitJobKey>,
3987 status: Option<SharedString>,
3988 job: F,
3989 ) -> oneshot::Receiver<R>
3990 where
3991 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3992 Fut: Future<Output = R> + 'static,
3993 R: Send + 'static,
3994 {
3995 let (result_tx, result_rx) = futures::channel::oneshot::channel();
3996 let job_id = post_inc(&mut self.job_id);
3997 let this = self.this.clone();
3998 self.job_sender
3999 .unbounded_send(GitJob {
4000 key,
4001 job: Box::new(move |state, cx: &mut AsyncApp| {
4002 let job = job(state, cx.clone());
4003 cx.spawn(async move |cx| {
4004 if let Some(s) = status.clone() {
4005 this.update(cx, |this, cx| {
4006 this.active_jobs.insert(
4007 job_id,
4008 JobInfo {
4009 start: Instant::now(),
4010 message: s.clone(),
4011 },
4012 );
4013
4014 cx.notify();
4015 })
4016 .ok();
4017 }
4018 let result = job.await;
4019
4020 this.update(cx, |this, cx| {
4021 this.active_jobs.remove(&job_id);
4022 cx.notify();
4023 })
4024 .ok();
4025
4026 result_tx.send(result).ok();
4027 })
4028 }),
4029 })
4030 .ok();
4031 result_rx
4032 }
4033
4034 pub fn set_as_active_repository(&self, cx: &mut Context<Self>) {
4035 let Some(git_store) = self.git_store.upgrade() else {
4036 return;
4037 };
4038 let entity = cx.entity();
4039 git_store.update(cx, |git_store, cx| {
4040 let Some((&id, _)) = git_store
4041 .repositories
4042 .iter()
4043 .find(|(_, handle)| *handle == &entity)
4044 else {
4045 return;
4046 };
4047 git_store.active_repo_id = Some(id);
4048 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
4049 });
4050 }
4051
4052 pub fn cached_status(&self) -> impl '_ + Iterator<Item = StatusEntry> {
4053 self.snapshot.status()
4054 }
4055
4056 pub fn cached_stash(&self) -> GitStash {
4057 self.snapshot.stash_entries.clone()
4058 }
4059
4060 pub fn repo_path_to_project_path(&self, path: &RepoPath, cx: &App) -> Option<ProjectPath> {
4061 let git_store = self.git_store.upgrade()?;
4062 let worktree_store = git_store.read(cx).worktree_store.read(cx);
4063 let abs_path = self.snapshot.repo_path_to_abs_path(path);
4064 let abs_path = SanitizedPath::new(&abs_path);
4065 let (worktree, relative_path) = worktree_store.find_worktree(abs_path, cx)?;
4066 Some(ProjectPath {
4067 worktree_id: worktree.read(cx).id(),
4068 path: relative_path,
4069 })
4070 }
4071
4072 pub fn project_path_to_repo_path(&self, path: &ProjectPath, cx: &App) -> Option<RepoPath> {
4073 let git_store = self.git_store.upgrade()?;
4074 let worktree_store = git_store.read(cx).worktree_store.read(cx);
4075 let abs_path = worktree_store.absolutize(path, cx)?;
4076 self.snapshot.abs_path_to_repo_path(&abs_path)
4077 }
4078
4079 pub fn contains_sub_repo(&self, other: &Entity<Self>, cx: &App) -> bool {
4080 other
4081 .read(cx)
4082 .snapshot
4083 .work_directory_abs_path
4084 .starts_with(&self.snapshot.work_directory_abs_path)
4085 }
4086
4087 pub fn open_commit_buffer(
4088 &mut self,
4089 languages: Option<Arc<LanguageRegistry>>,
4090 buffer_store: Entity<BufferStore>,
4091 cx: &mut Context<Self>,
4092 ) -> Task<Result<Entity<Buffer>>> {
4093 let id = self.id;
4094 if let Some(buffer) = self.commit_message_buffer.clone() {
4095 return Task::ready(Ok(buffer));
4096 }
4097 let this = cx.weak_entity();
4098
4099 let rx = self.send_job(None, move |state, mut cx| async move {
4100 let Some(this) = this.upgrade() else {
4101 bail!("git store was dropped");
4102 };
4103 match state {
4104 RepositoryState::Local(..) => {
4105 this.update(&mut cx, |_, cx| {
4106 Self::open_local_commit_buffer(languages, buffer_store, cx)
4107 })
4108 .await
4109 }
4110 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4111 let request = client.request(proto::OpenCommitMessageBuffer {
4112 project_id: project_id.0,
4113 repository_id: id.to_proto(),
4114 });
4115 let response = request.await.context("requesting to open commit buffer")?;
4116 let buffer_id = BufferId::new(response.buffer_id)?;
4117 let buffer = buffer_store
4118 .update(&mut cx, |buffer_store, cx| {
4119 buffer_store.wait_for_remote_buffer(buffer_id, cx)
4120 })
4121 .await?;
4122 if let Some(language_registry) = languages {
4123 let git_commit_language =
4124 language_registry.language_for_name("Git Commit").await?;
4125 buffer.update(&mut cx, |buffer, cx| {
4126 buffer.set_language(Some(git_commit_language), cx);
4127 });
4128 }
4129 this.update(&mut cx, |this, _| {
4130 this.commit_message_buffer = Some(buffer.clone());
4131 });
4132 Ok(buffer)
4133 }
4134 }
4135 });
4136
4137 cx.spawn(|_, _: &mut AsyncApp| async move { rx.await? })
4138 }
4139
4140 fn open_local_commit_buffer(
4141 language_registry: Option<Arc<LanguageRegistry>>,
4142 buffer_store: Entity<BufferStore>,
4143 cx: &mut Context<Self>,
4144 ) -> Task<Result<Entity<Buffer>>> {
4145 cx.spawn(async move |repository, cx| {
4146 let git_commit_language = match language_registry {
4147 Some(language_registry) => {
4148 Some(language_registry.language_for_name("Git Commit").await?)
4149 }
4150 None => None,
4151 };
4152 let buffer = buffer_store
4153 .update(cx, |buffer_store, cx| {
4154 buffer_store.create_buffer(git_commit_language, false, cx)
4155 })
4156 .await?;
4157
4158 repository.update(cx, |repository, _| {
4159 repository.commit_message_buffer = Some(buffer.clone());
4160 })?;
4161 Ok(buffer)
4162 })
4163 }
4164
4165 pub fn checkout_files(
4166 &mut self,
4167 commit: &str,
4168 paths: Vec<RepoPath>,
4169 cx: &mut Context<Self>,
4170 ) -> Task<Result<()>> {
4171 let commit = commit.to_string();
4172 let id = self.id;
4173
4174 self.spawn_job_with_tracking(
4175 paths.clone(),
4176 pending_op::GitStatus::Reverted,
4177 cx,
4178 async move |this, cx| {
4179 this.update(cx, |this, _cx| {
4180 this.send_job(
4181 Some(format!("git checkout {}", commit).into()),
4182 move |git_repo, _| async move {
4183 match git_repo {
4184 RepositoryState::Local(LocalRepositoryState {
4185 backend,
4186 environment,
4187 ..
4188 }) => {
4189 backend
4190 .checkout_files(commit, paths, environment.clone())
4191 .await
4192 }
4193 RepositoryState::Remote(RemoteRepositoryState {
4194 project_id,
4195 client,
4196 }) => {
4197 client
4198 .request(proto::GitCheckoutFiles {
4199 project_id: project_id.0,
4200 repository_id: id.to_proto(),
4201 commit,
4202 paths: paths
4203 .into_iter()
4204 .map(|p| p.to_proto())
4205 .collect(),
4206 })
4207 .await?;
4208
4209 Ok(())
4210 }
4211 }
4212 },
4213 )
4214 })?
4215 .await?
4216 },
4217 )
4218 }
4219
4220 pub fn reset(
4221 &mut self,
4222 commit: String,
4223 reset_mode: ResetMode,
4224 _cx: &mut App,
4225 ) -> oneshot::Receiver<Result<()>> {
4226 let id = self.id;
4227
4228 self.send_job(None, move |git_repo, _| async move {
4229 match git_repo {
4230 RepositoryState::Local(LocalRepositoryState {
4231 backend,
4232 environment,
4233 ..
4234 }) => backend.reset(commit, reset_mode, environment).await,
4235 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4236 client
4237 .request(proto::GitReset {
4238 project_id: project_id.0,
4239 repository_id: id.to_proto(),
4240 commit,
4241 mode: match reset_mode {
4242 ResetMode::Soft => git_reset::ResetMode::Soft.into(),
4243 ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
4244 },
4245 })
4246 .await?;
4247
4248 Ok(())
4249 }
4250 }
4251 })
4252 }
4253
4254 pub fn show(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDetails>> {
4255 let id = self.id;
4256 self.send_job(None, move |git_repo, _cx| async move {
4257 match git_repo {
4258 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
4259 backend.show(commit).await
4260 }
4261 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4262 let resp = client
4263 .request(proto::GitShow {
4264 project_id: project_id.0,
4265 repository_id: id.to_proto(),
4266 commit,
4267 })
4268 .await?;
4269
4270 Ok(CommitDetails {
4271 sha: resp.sha.into(),
4272 message: resp.message.into(),
4273 commit_timestamp: resp.commit_timestamp,
4274 author_email: resp.author_email.into(),
4275 author_name: resp.author_name.into(),
4276 })
4277 }
4278 }
4279 })
4280 }
4281
4282 pub fn load_commit_diff(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
4283 let id = self.id;
4284 self.send_job(None, move |git_repo, cx| async move {
4285 match git_repo {
4286 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
4287 backend.load_commit(commit, cx).await
4288 }
4289 RepositoryState::Remote(RemoteRepositoryState {
4290 client, project_id, ..
4291 }) => {
4292 let response = client
4293 .request(proto::LoadCommitDiff {
4294 project_id: project_id.0,
4295 repository_id: id.to_proto(),
4296 commit,
4297 })
4298 .await?;
4299 Ok(CommitDiff {
4300 files: response
4301 .files
4302 .into_iter()
4303 .map(|file| {
4304 Ok(CommitFile {
4305 path: RepoPath::from_proto(&file.path)?,
4306 old_text: file.old_text,
4307 new_text: file.new_text,
4308 is_binary: file.is_binary,
4309 })
4310 })
4311 .collect::<Result<Vec<_>>>()?,
4312 })
4313 }
4314 }
4315 })
4316 }
4317
4318 pub fn file_history(
4319 &mut self,
4320 path: RepoPath,
4321 ) -> oneshot::Receiver<Result<git::repository::FileHistory>> {
4322 self.file_history_paginated(path, 0, None)
4323 }
4324
4325 pub fn file_history_paginated(
4326 &mut self,
4327 path: RepoPath,
4328 skip: usize,
4329 limit: Option<usize>,
4330 ) -> oneshot::Receiver<Result<git::repository::FileHistory>> {
4331 let id = self.id;
4332 self.send_job(None, move |git_repo, _cx| async move {
4333 match git_repo {
4334 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
4335 backend.file_history_paginated(path, skip, limit).await
4336 }
4337 RepositoryState::Remote(RemoteRepositoryState { client, project_id }) => {
4338 let response = client
4339 .request(proto::GitFileHistory {
4340 project_id: project_id.0,
4341 repository_id: id.to_proto(),
4342 path: path.to_proto(),
4343 skip: skip as u64,
4344 limit: limit.map(|l| l as u64),
4345 })
4346 .await?;
4347 Ok(git::repository::FileHistory {
4348 entries: response
4349 .entries
4350 .into_iter()
4351 .map(|entry| git::repository::FileHistoryEntry {
4352 sha: entry.sha.into(),
4353 subject: entry.subject.into(),
4354 message: entry.message.into(),
4355 commit_timestamp: entry.commit_timestamp,
4356 author_name: entry.author_name.into(),
4357 author_email: entry.author_email.into(),
4358 })
4359 .collect(),
4360 path: RepoPath::from_proto(&response.path)?,
4361 })
4362 }
4363 }
4364 })
4365 }
4366
4367 pub fn graph_data(
4368 &mut self,
4369 log_source: LogSource,
4370 log_order: LogOrder,
4371 range: Range<usize>,
4372 cx: &mut Context<Self>,
4373 ) -> (&[Arc<InitialGraphCommitData>], bool) {
4374 let (loading_task, initial_commit_data) = self
4375 .initial_graph_data
4376 .entry((log_order, log_source.clone()))
4377 .or_insert_with(|| {
4378 let state = self.repository_state.clone();
4379 let log_source = log_source.clone();
4380 (
4381 cx.spawn(async move |repository, cx| {
4382 let state = state.await;
4383 match state {
4384 Ok(RepositoryState::Local(LocalRepositoryState {
4385 backend, ..
4386 })) => {
4387 Self::local_git_graph_data(
4388 repository, backend, log_source, log_order, cx,
4389 )
4390 .await
4391 }
4392 Ok(RepositoryState::Remote(_)) => {
4393 Err("Git graph is not supported for collab yet".into())
4394 }
4395 Err(e) => Err(SharedString::from(e)),
4396 }
4397 }),
4398 vec![],
4399 )
4400 });
4401
4402 let max_start = initial_commit_data.len().saturating_sub(1);
4403 let max_end = initial_commit_data.len();
4404 (
4405 &initial_commit_data[range.start.min(max_start)..range.end.min(max_end)],
4406 !loading_task.is_ready(),
4407 )
4408 }
4409
4410 async fn local_git_graph_data(
4411 this: WeakEntity<Self>,
4412 backend: Arc<dyn GitRepository>,
4413 log_source: LogSource,
4414 log_order: LogOrder,
4415 cx: &mut AsyncApp,
4416 ) -> Result<(), SharedString> {
4417 let (request_tx, request_rx) =
4418 smol::channel::unbounded::<Vec<Arc<InitialGraphCommitData>>>();
4419
4420 let task = cx.background_executor().spawn({
4421 let log_source = log_source.clone();
4422 async move {
4423 backend
4424 .initial_graph_data(log_source, log_order, request_tx)
4425 .await
4426 .map_err(|err| SharedString::from(err.to_string()))
4427 }
4428 });
4429
4430 let graph_data_key = (log_order, log_source.clone());
4431
4432 while let Ok(initial_graph_commit_data) = request_rx.recv().await {
4433 this.update(cx, |repository, cx| {
4434 let graph_data = repository
4435 .initial_graph_data
4436 .get_mut(&graph_data_key)
4437 .map(|(_, graph_data)| graph_data);
4438 debug_assert!(
4439 graph_data.is_some(),
4440 "This task should be dropped if data doesn't exist"
4441 );
4442
4443 if let Some(graph_data) = graph_data {
4444 graph_data.extend(initial_graph_commit_data);
4445 cx.emit(RepositoryEvent::GitGraphCountUpdated(
4446 graph_data_key.clone(),
4447 graph_data.len(),
4448 ));
4449 }
4450 })
4451 .ok();
4452 }
4453
4454 task.await?;
4455
4456 Ok(())
4457 }
4458
4459 pub fn fetch_commit_data(&mut self, sha: Oid, cx: &mut Context<Self>) -> &CommitDataState {
4460 if !self.commit_data.contains_key(&sha) {
4461 match &self.graph_commit_data_handler {
4462 GraphCommitHandlerState::Open(handler) => {
4463 if handler.commit_data_request.try_send(sha).is_ok() {
4464 let old_value = self.commit_data.insert(sha, CommitDataState::Loading);
4465 debug_assert!(old_value.is_none(), "We should never overwrite commit data");
4466 }
4467 }
4468 GraphCommitHandlerState::Closed => {
4469 self.open_graph_commit_data_handler(cx);
4470 }
4471 GraphCommitHandlerState::Starting => {}
4472 }
4473 }
4474
4475 self.commit_data
4476 .get(&sha)
4477 .unwrap_or(&CommitDataState::Loading)
4478 }
4479
4480 fn open_graph_commit_data_handler(&mut self, cx: &mut Context<Self>) {
4481 self.graph_commit_data_handler = GraphCommitHandlerState::Starting;
4482
4483 let state = self.repository_state.clone();
4484 let (result_tx, result_rx) = smol::channel::bounded::<(Oid, GraphCommitData)>(64);
4485 let (request_tx, request_rx) = smol::channel::unbounded::<Oid>();
4486
4487 let foreground_task = cx.spawn(async move |this, cx| {
4488 while let Ok((sha, commit_data)) = result_rx.recv().await {
4489 let result = this.update(cx, |this, cx| {
4490 let old_value = this
4491 .commit_data
4492 .insert(sha, CommitDataState::Loaded(Arc::new(commit_data)));
4493 debug_assert!(
4494 !matches!(old_value, Some(CommitDataState::Loaded(_))),
4495 "We should never overwrite commit data"
4496 );
4497
4498 cx.notify();
4499 });
4500 if result.is_err() {
4501 break;
4502 }
4503 }
4504
4505 this.update(cx, |this, _cx| {
4506 this.graph_commit_data_handler = GraphCommitHandlerState::Closed;
4507 })
4508 .ok();
4509 });
4510
4511 let request_tx_for_handler = request_tx;
4512 let background_executor = cx.background_executor().clone();
4513
4514 cx.background_spawn(async move {
4515 let backend = match state.await {
4516 Ok(RepositoryState::Local(LocalRepositoryState { backend, .. })) => backend,
4517 Ok(RepositoryState::Remote(_)) => {
4518 log::error!("commit_data_reader not supported for remote repositories");
4519 return;
4520 }
4521 Err(error) => {
4522 log::error!("failed to get repository state: {error}");
4523 return;
4524 }
4525 };
4526
4527 let reader = match backend.commit_data_reader() {
4528 Ok(reader) => reader,
4529 Err(error) => {
4530 log::error!("failed to create commit data reader: {error:?}");
4531 return;
4532 }
4533 };
4534
4535 loop {
4536 let timeout = background_executor.timer(std::time::Duration::from_secs(10));
4537
4538 futures::select_biased! {
4539 sha = futures::FutureExt::fuse(request_rx.recv()) => {
4540 let Ok(sha) = sha else {
4541 break;
4542 };
4543
4544 match reader.read(sha).await {
4545 Ok(commit_data) => {
4546 if result_tx.send((sha, commit_data)).await.is_err() {
4547 break;
4548 }
4549 }
4550 Err(error) => {
4551 log::error!("failed to read commit data for {sha}: {error:?}");
4552 }
4553 }
4554 }
4555 _ = futures::FutureExt::fuse(timeout) => {
4556 break;
4557 }
4558 }
4559 }
4560
4561 drop(result_tx);
4562 })
4563 .detach();
4564
4565 self.graph_commit_data_handler = GraphCommitHandlerState::Open(GraphCommitDataHandler {
4566 _task: foreground_task,
4567 commit_data_request: request_tx_for_handler,
4568 });
4569 }
4570
4571 fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
4572 Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
4573 }
4574
4575 fn save_buffers<'a>(
4576 &self,
4577 entries: impl IntoIterator<Item = &'a RepoPath>,
4578 cx: &mut Context<Self>,
4579 ) -> Vec<Task<anyhow::Result<()>>> {
4580 let mut save_futures = Vec::new();
4581 if let Some(buffer_store) = self.buffer_store(cx) {
4582 buffer_store.update(cx, |buffer_store, cx| {
4583 for path in entries {
4584 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
4585 continue;
4586 };
4587 if let Some(buffer) = buffer_store.get_by_path(&project_path)
4588 && buffer
4589 .read(cx)
4590 .file()
4591 .is_some_and(|file| file.disk_state().exists())
4592 && buffer.read(cx).has_unsaved_edits()
4593 {
4594 save_futures.push(buffer_store.save_buffer(buffer, cx));
4595 }
4596 }
4597 })
4598 }
4599 save_futures
4600 }
4601
4602 pub fn stage_entries(
4603 &mut self,
4604 entries: Vec<RepoPath>,
4605 cx: &mut Context<Self>,
4606 ) -> Task<anyhow::Result<()>> {
4607 self.stage_or_unstage_entries(true, entries, cx)
4608 }
4609
4610 pub fn unstage_entries(
4611 &mut self,
4612 entries: Vec<RepoPath>,
4613 cx: &mut Context<Self>,
4614 ) -> Task<anyhow::Result<()>> {
4615 self.stage_or_unstage_entries(false, entries, cx)
4616 }
4617
4618 fn stage_or_unstage_entries(
4619 &mut self,
4620 stage: bool,
4621 entries: Vec<RepoPath>,
4622 cx: &mut Context<Self>,
4623 ) -> Task<anyhow::Result<()>> {
4624 if entries.is_empty() {
4625 return Task::ready(Ok(()));
4626 }
4627 let Some(git_store) = self.git_store.upgrade() else {
4628 return Task::ready(Ok(()));
4629 };
4630 let id = self.id;
4631 let save_tasks = self.save_buffers(&entries, cx);
4632 let paths = entries
4633 .iter()
4634 .map(|p| p.as_unix_str())
4635 .collect::<Vec<_>>()
4636 .join(" ");
4637 let status = if stage {
4638 format!("git add {paths}")
4639 } else {
4640 format!("git reset {paths}")
4641 };
4642 let job_key = GitJobKey::WriteIndex(entries.clone());
4643
4644 self.spawn_job_with_tracking(
4645 entries.clone(),
4646 if stage {
4647 pending_op::GitStatus::Staged
4648 } else {
4649 pending_op::GitStatus::Unstaged
4650 },
4651 cx,
4652 async move |this, cx| {
4653 for save_task in save_tasks {
4654 save_task.await?;
4655 }
4656
4657 this.update(cx, |this, cx| {
4658 let weak_this = cx.weak_entity();
4659 this.send_keyed_job(
4660 Some(job_key),
4661 Some(status.into()),
4662 move |git_repo, mut cx| async move {
4663 let hunk_staging_operation_counts = weak_this
4664 .update(&mut cx, |this, cx| {
4665 let mut hunk_staging_operation_counts = HashMap::default();
4666 for path in &entries {
4667 let Some(project_path) =
4668 this.repo_path_to_project_path(path, cx)
4669 else {
4670 continue;
4671 };
4672 let Some(buffer) = git_store
4673 .read(cx)
4674 .buffer_store
4675 .read(cx)
4676 .get_by_path(&project_path)
4677 else {
4678 continue;
4679 };
4680 let Some(diff_state) = git_store
4681 .read(cx)
4682 .diffs
4683 .get(&buffer.read(cx).remote_id())
4684 .cloned()
4685 else {
4686 continue;
4687 };
4688 let Some(uncommitted_diff) =
4689 diff_state.read(cx).uncommitted_diff.as_ref().and_then(
4690 |uncommitted_diff| uncommitted_diff.upgrade(),
4691 )
4692 else {
4693 continue;
4694 };
4695 let buffer_snapshot = buffer.read(cx).text_snapshot();
4696 let file_exists = buffer
4697 .read(cx)
4698 .file()
4699 .is_some_and(|file| file.disk_state().exists());
4700 let hunk_staging_operation_count =
4701 diff_state.update(cx, |diff_state, cx| {
4702 uncommitted_diff.update(
4703 cx,
4704 |uncommitted_diff, cx| {
4705 uncommitted_diff
4706 .stage_or_unstage_all_hunks(
4707 stage,
4708 &buffer_snapshot,
4709 file_exists,
4710 cx,
4711 );
4712 },
4713 );
4714
4715 diff_state.hunk_staging_operation_count += 1;
4716 diff_state.hunk_staging_operation_count
4717 });
4718 hunk_staging_operation_counts.insert(
4719 diff_state.downgrade(),
4720 hunk_staging_operation_count,
4721 );
4722 }
4723 hunk_staging_operation_counts
4724 })
4725 .unwrap_or_default();
4726
4727 let result = match git_repo {
4728 RepositoryState::Local(LocalRepositoryState {
4729 backend,
4730 environment,
4731 ..
4732 }) => {
4733 if stage {
4734 backend.stage_paths(entries, environment.clone()).await
4735 } else {
4736 backend.unstage_paths(entries, environment.clone()).await
4737 }
4738 }
4739 RepositoryState::Remote(RemoteRepositoryState {
4740 project_id,
4741 client,
4742 }) => {
4743 if stage {
4744 client
4745 .request(proto::Stage {
4746 project_id: project_id.0,
4747 repository_id: id.to_proto(),
4748 paths: entries
4749 .into_iter()
4750 .map(|repo_path| repo_path.to_proto())
4751 .collect(),
4752 })
4753 .await
4754 .context("sending stage request")
4755 .map(|_| ())
4756 } else {
4757 client
4758 .request(proto::Unstage {
4759 project_id: project_id.0,
4760 repository_id: id.to_proto(),
4761 paths: entries
4762 .into_iter()
4763 .map(|repo_path| repo_path.to_proto())
4764 .collect(),
4765 })
4766 .await
4767 .context("sending unstage request")
4768 .map(|_| ())
4769 }
4770 }
4771 };
4772
4773 for (diff_state, hunk_staging_operation_count) in
4774 hunk_staging_operation_counts
4775 {
4776 diff_state
4777 .update(&mut cx, |diff_state, cx| {
4778 if result.is_ok() {
4779 diff_state.hunk_staging_operation_count_as_of_write =
4780 hunk_staging_operation_count;
4781 } else if let Some(uncommitted_diff) =
4782 &diff_state.uncommitted_diff
4783 {
4784 uncommitted_diff
4785 .update(cx, |uncommitted_diff, cx| {
4786 uncommitted_diff.clear_pending_hunks(cx);
4787 })
4788 .ok();
4789 }
4790 })
4791 .ok();
4792 }
4793
4794 result
4795 },
4796 )
4797 })?
4798 .await?
4799 },
4800 )
4801 }
4802
4803 pub fn stage_all(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
4804 let to_stage = self
4805 .cached_status()
4806 .filter_map(|entry| {
4807 if let Some(ops) = self.pending_ops_for_path(&entry.repo_path) {
4808 if ops.staging() || ops.staged() {
4809 None
4810 } else {
4811 Some(entry.repo_path)
4812 }
4813 } else if entry.status.staging().is_fully_staged() {
4814 None
4815 } else {
4816 Some(entry.repo_path)
4817 }
4818 })
4819 .collect();
4820 self.stage_or_unstage_entries(true, to_stage, cx)
4821 }
4822
4823 pub fn unstage_all(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
4824 let to_unstage = self
4825 .cached_status()
4826 .filter_map(|entry| {
4827 if let Some(ops) = self.pending_ops_for_path(&entry.repo_path) {
4828 if !ops.staging() && !ops.staged() {
4829 None
4830 } else {
4831 Some(entry.repo_path)
4832 }
4833 } else if entry.status.staging().is_fully_unstaged() {
4834 None
4835 } else {
4836 Some(entry.repo_path)
4837 }
4838 })
4839 .collect();
4840 self.stage_or_unstage_entries(false, to_unstage, cx)
4841 }
4842
4843 pub fn stash_all(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
4844 let to_stash = self.cached_status().map(|entry| entry.repo_path).collect();
4845
4846 self.stash_entries(to_stash, cx)
4847 }
4848
4849 pub fn stash_entries(
4850 &mut self,
4851 entries: Vec<RepoPath>,
4852 cx: &mut Context<Self>,
4853 ) -> Task<anyhow::Result<()>> {
4854 let id = self.id;
4855
4856 cx.spawn(async move |this, cx| {
4857 this.update(cx, |this, _| {
4858 this.send_job(None, move |git_repo, _cx| async move {
4859 match git_repo {
4860 RepositoryState::Local(LocalRepositoryState {
4861 backend,
4862 environment,
4863 ..
4864 }) => backend.stash_paths(entries, environment).await,
4865 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4866 client
4867 .request(proto::Stash {
4868 project_id: project_id.0,
4869 repository_id: id.to_proto(),
4870 paths: entries
4871 .into_iter()
4872 .map(|repo_path| repo_path.to_proto())
4873 .collect(),
4874 })
4875 .await
4876 .context("sending stash request")?;
4877 Ok(())
4878 }
4879 }
4880 })
4881 })?
4882 .await??;
4883 Ok(())
4884 })
4885 }
4886
4887 pub fn stash_pop(
4888 &mut self,
4889 index: Option<usize>,
4890 cx: &mut Context<Self>,
4891 ) -> Task<anyhow::Result<()>> {
4892 let id = self.id;
4893 cx.spawn(async move |this, cx| {
4894 this.update(cx, |this, _| {
4895 this.send_job(None, move |git_repo, _cx| async move {
4896 match git_repo {
4897 RepositoryState::Local(LocalRepositoryState {
4898 backend,
4899 environment,
4900 ..
4901 }) => backend.stash_pop(index, environment).await,
4902 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4903 client
4904 .request(proto::StashPop {
4905 project_id: project_id.0,
4906 repository_id: id.to_proto(),
4907 stash_index: index.map(|i| i as u64),
4908 })
4909 .await
4910 .context("sending stash pop request")?;
4911 Ok(())
4912 }
4913 }
4914 })
4915 })?
4916 .await??;
4917 Ok(())
4918 })
4919 }
4920
4921 pub fn stash_apply(
4922 &mut self,
4923 index: Option<usize>,
4924 cx: &mut Context<Self>,
4925 ) -> Task<anyhow::Result<()>> {
4926 let id = self.id;
4927 cx.spawn(async move |this, cx| {
4928 this.update(cx, |this, _| {
4929 this.send_job(None, move |git_repo, _cx| async move {
4930 match git_repo {
4931 RepositoryState::Local(LocalRepositoryState {
4932 backend,
4933 environment,
4934 ..
4935 }) => backend.stash_apply(index, environment).await,
4936 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4937 client
4938 .request(proto::StashApply {
4939 project_id: project_id.0,
4940 repository_id: id.to_proto(),
4941 stash_index: index.map(|i| i as u64),
4942 })
4943 .await
4944 .context("sending stash apply request")?;
4945 Ok(())
4946 }
4947 }
4948 })
4949 })?
4950 .await??;
4951 Ok(())
4952 })
4953 }
4954
4955 pub fn stash_drop(
4956 &mut self,
4957 index: Option<usize>,
4958 cx: &mut Context<Self>,
4959 ) -> oneshot::Receiver<anyhow::Result<()>> {
4960 let id = self.id;
4961 let updates_tx = self
4962 .git_store()
4963 .and_then(|git_store| match &git_store.read(cx).state {
4964 GitStoreState::Local { downstream, .. } => downstream
4965 .as_ref()
4966 .map(|downstream| downstream.updates_tx.clone()),
4967 _ => None,
4968 });
4969 let this = cx.weak_entity();
4970 self.send_job(None, move |git_repo, mut cx| async move {
4971 match git_repo {
4972 RepositoryState::Local(LocalRepositoryState {
4973 backend,
4974 environment,
4975 ..
4976 }) => {
4977 // TODO would be nice to not have to do this manually
4978 let result = backend.stash_drop(index, environment).await;
4979 if result.is_ok()
4980 && let Ok(stash_entries) = backend.stash_entries().await
4981 {
4982 let snapshot = this.update(&mut cx, |this, cx| {
4983 this.snapshot.stash_entries = stash_entries;
4984 cx.emit(RepositoryEvent::StashEntriesChanged);
4985 this.snapshot.clone()
4986 })?;
4987 if let Some(updates_tx) = updates_tx {
4988 updates_tx
4989 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
4990 .ok();
4991 }
4992 }
4993
4994 result
4995 }
4996 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4997 client
4998 .request(proto::StashDrop {
4999 project_id: project_id.0,
5000 repository_id: id.to_proto(),
5001 stash_index: index.map(|i| i as u64),
5002 })
5003 .await
5004 .context("sending stash pop request")?;
5005 Ok(())
5006 }
5007 }
5008 })
5009 }
5010
5011 pub fn run_hook(&mut self, hook: RunHook, _cx: &mut App) -> oneshot::Receiver<Result<()>> {
5012 let id = self.id;
5013 self.send_job(
5014 Some(format!("git hook {}", hook.as_str()).into()),
5015 move |git_repo, _cx| async move {
5016 match git_repo {
5017 RepositoryState::Local(LocalRepositoryState {
5018 backend,
5019 environment,
5020 ..
5021 }) => backend.run_hook(hook, environment.clone()).await,
5022 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5023 client
5024 .request(proto::RunGitHook {
5025 project_id: project_id.0,
5026 repository_id: id.to_proto(),
5027 hook: hook.to_proto(),
5028 })
5029 .await?;
5030
5031 Ok(())
5032 }
5033 }
5034 },
5035 )
5036 }
5037
5038 pub fn commit(
5039 &mut self,
5040 message: SharedString,
5041 name_and_email: Option<(SharedString, SharedString)>,
5042 options: CommitOptions,
5043 askpass: AskPassDelegate,
5044 cx: &mut App,
5045 ) -> oneshot::Receiver<Result<()>> {
5046 let id = self.id;
5047 let askpass_delegates = self.askpass_delegates.clone();
5048 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
5049
5050 let rx = self.run_hook(RunHook::PreCommit, cx);
5051
5052 self.send_job(Some("git commit".into()), move |git_repo, _cx| async move {
5053 rx.await??;
5054
5055 match git_repo {
5056 RepositoryState::Local(LocalRepositoryState {
5057 backend,
5058 environment,
5059 ..
5060 }) => {
5061 backend
5062 .commit(message, name_and_email, options, askpass, environment)
5063 .await
5064 }
5065 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5066 askpass_delegates.lock().insert(askpass_id, askpass);
5067 let _defer = util::defer(|| {
5068 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
5069 debug_assert!(askpass_delegate.is_some());
5070 });
5071 let (name, email) = name_and_email.unzip();
5072 client
5073 .request(proto::Commit {
5074 project_id: project_id.0,
5075 repository_id: id.to_proto(),
5076 message: String::from(message),
5077 name: name.map(String::from),
5078 email: email.map(String::from),
5079 options: Some(proto::commit::CommitOptions {
5080 amend: options.amend,
5081 signoff: options.signoff,
5082 }),
5083 askpass_id,
5084 })
5085 .await
5086 .context("sending commit request")?;
5087
5088 Ok(())
5089 }
5090 }
5091 })
5092 }
5093
5094 pub fn fetch(
5095 &mut self,
5096 fetch_options: FetchOptions,
5097 askpass: AskPassDelegate,
5098 _cx: &mut App,
5099 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
5100 let askpass_delegates = self.askpass_delegates.clone();
5101 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
5102 let id = self.id;
5103
5104 self.send_job(Some("git fetch".into()), move |git_repo, cx| async move {
5105 match git_repo {
5106 RepositoryState::Local(LocalRepositoryState {
5107 backend,
5108 environment,
5109 ..
5110 }) => backend.fetch(fetch_options, askpass, environment, cx).await,
5111 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5112 askpass_delegates.lock().insert(askpass_id, askpass);
5113 let _defer = util::defer(|| {
5114 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
5115 debug_assert!(askpass_delegate.is_some());
5116 });
5117
5118 let response = client
5119 .request(proto::Fetch {
5120 project_id: project_id.0,
5121 repository_id: id.to_proto(),
5122 askpass_id,
5123 remote: fetch_options.to_proto(),
5124 })
5125 .await
5126 .context("sending fetch request")?;
5127
5128 Ok(RemoteCommandOutput {
5129 stdout: response.stdout,
5130 stderr: response.stderr,
5131 })
5132 }
5133 }
5134 })
5135 }
5136
5137 pub fn push(
5138 &mut self,
5139 branch: SharedString,
5140 remote_branch: SharedString,
5141 remote: SharedString,
5142 options: Option<PushOptions>,
5143 askpass: AskPassDelegate,
5144 cx: &mut Context<Self>,
5145 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
5146 let askpass_delegates = self.askpass_delegates.clone();
5147 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
5148 let id = self.id;
5149
5150 let args = options
5151 .map(|option| match option {
5152 PushOptions::SetUpstream => " --set-upstream",
5153 PushOptions::Force => " --force-with-lease",
5154 })
5155 .unwrap_or("");
5156
5157 let updates_tx = self
5158 .git_store()
5159 .and_then(|git_store| match &git_store.read(cx).state {
5160 GitStoreState::Local { downstream, .. } => downstream
5161 .as_ref()
5162 .map(|downstream| downstream.updates_tx.clone()),
5163 _ => None,
5164 });
5165
5166 let this = cx.weak_entity();
5167 self.send_job(
5168 Some(format!("git push {} {} {}:{}", args, remote, branch, remote_branch).into()),
5169 move |git_repo, mut cx| async move {
5170 match git_repo {
5171 RepositoryState::Local(LocalRepositoryState {
5172 backend,
5173 environment,
5174 ..
5175 }) => {
5176 let result = backend
5177 .push(
5178 branch.to_string(),
5179 remote_branch.to_string(),
5180 remote.to_string(),
5181 options,
5182 askpass,
5183 environment.clone(),
5184 cx.clone(),
5185 )
5186 .await;
5187 // TODO would be nice to not have to do this manually
5188 if result.is_ok() {
5189 let branches = backend.branches().await?;
5190 let branch = branches.into_iter().find(|branch| branch.is_head);
5191 log::info!("head branch after scan is {branch:?}");
5192 let snapshot = this.update(&mut cx, |this, cx| {
5193 this.snapshot.branch = branch;
5194 cx.emit(RepositoryEvent::BranchChanged);
5195 this.snapshot.clone()
5196 })?;
5197 if let Some(updates_tx) = updates_tx {
5198 updates_tx
5199 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
5200 .ok();
5201 }
5202 }
5203 result
5204 }
5205 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5206 askpass_delegates.lock().insert(askpass_id, askpass);
5207 let _defer = util::defer(|| {
5208 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
5209 debug_assert!(askpass_delegate.is_some());
5210 });
5211 let response = client
5212 .request(proto::Push {
5213 project_id: project_id.0,
5214 repository_id: id.to_proto(),
5215 askpass_id,
5216 branch_name: branch.to_string(),
5217 remote_branch_name: remote_branch.to_string(),
5218 remote_name: remote.to_string(),
5219 options: options.map(|options| match options {
5220 PushOptions::Force => proto::push::PushOptions::Force,
5221 PushOptions::SetUpstream => {
5222 proto::push::PushOptions::SetUpstream
5223 }
5224 }
5225 as i32),
5226 })
5227 .await
5228 .context("sending push request")?;
5229
5230 Ok(RemoteCommandOutput {
5231 stdout: response.stdout,
5232 stderr: response.stderr,
5233 })
5234 }
5235 }
5236 },
5237 )
5238 }
5239
5240 pub fn pull(
5241 &mut self,
5242 branch: Option<SharedString>,
5243 remote: SharedString,
5244 rebase: bool,
5245 askpass: AskPassDelegate,
5246 _cx: &mut App,
5247 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
5248 let askpass_delegates = self.askpass_delegates.clone();
5249 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
5250 let id = self.id;
5251
5252 let mut status = "git pull".to_string();
5253 if rebase {
5254 status.push_str(" --rebase");
5255 }
5256 status.push_str(&format!(" {}", remote));
5257 if let Some(b) = &branch {
5258 status.push_str(&format!(" {}", b));
5259 }
5260
5261 self.send_job(Some(status.into()), move |git_repo, cx| async move {
5262 match git_repo {
5263 RepositoryState::Local(LocalRepositoryState {
5264 backend,
5265 environment,
5266 ..
5267 }) => {
5268 backend
5269 .pull(
5270 branch.as_ref().map(|b| b.to_string()),
5271 remote.to_string(),
5272 rebase,
5273 askpass,
5274 environment.clone(),
5275 cx,
5276 )
5277 .await
5278 }
5279 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5280 askpass_delegates.lock().insert(askpass_id, askpass);
5281 let _defer = util::defer(|| {
5282 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
5283 debug_assert!(askpass_delegate.is_some());
5284 });
5285 let response = client
5286 .request(proto::Pull {
5287 project_id: project_id.0,
5288 repository_id: id.to_proto(),
5289 askpass_id,
5290 rebase,
5291 branch_name: branch.as_ref().map(|b| b.to_string()),
5292 remote_name: remote.to_string(),
5293 })
5294 .await
5295 .context("sending pull request")?;
5296
5297 Ok(RemoteCommandOutput {
5298 stdout: response.stdout,
5299 stderr: response.stderr,
5300 })
5301 }
5302 }
5303 })
5304 }
5305
5306 fn spawn_set_index_text_job(
5307 &mut self,
5308 path: RepoPath,
5309 content: Option<String>,
5310 hunk_staging_operation_count: Option<usize>,
5311 cx: &mut Context<Self>,
5312 ) -> oneshot::Receiver<anyhow::Result<()>> {
5313 let id = self.id;
5314 let this = cx.weak_entity();
5315 let git_store = self.git_store.clone();
5316 let abs_path = self.snapshot.repo_path_to_abs_path(&path);
5317 self.send_keyed_job(
5318 Some(GitJobKey::WriteIndex(vec![path.clone()])),
5319 None,
5320 move |git_repo, mut cx| async move {
5321 log::debug!(
5322 "start updating index text for buffer {}",
5323 path.as_unix_str()
5324 );
5325
5326 match git_repo {
5327 RepositoryState::Local(LocalRepositoryState {
5328 fs,
5329 backend,
5330 environment,
5331 ..
5332 }) => {
5333 let executable = match fs.metadata(&abs_path).await {
5334 Ok(Some(meta)) => meta.is_executable,
5335 Ok(None) => false,
5336 Err(_err) => false,
5337 };
5338 backend
5339 .set_index_text(path.clone(), content, environment.clone(), executable)
5340 .await?;
5341 }
5342 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5343 client
5344 .request(proto::SetIndexText {
5345 project_id: project_id.0,
5346 repository_id: id.to_proto(),
5347 path: path.to_proto(),
5348 text: content,
5349 })
5350 .await?;
5351 }
5352 }
5353 log::debug!(
5354 "finish updating index text for buffer {}",
5355 path.as_unix_str()
5356 );
5357
5358 if let Some(hunk_staging_operation_count) = hunk_staging_operation_count {
5359 let project_path = this
5360 .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
5361 .ok()
5362 .flatten();
5363 git_store
5364 .update(&mut cx, |git_store, cx| {
5365 let buffer_id = git_store
5366 .buffer_store
5367 .read(cx)
5368 .get_by_path(&project_path?)?
5369 .read(cx)
5370 .remote_id();
5371 let diff_state = git_store.diffs.get(&buffer_id)?;
5372 diff_state.update(cx, |diff_state, _| {
5373 diff_state.hunk_staging_operation_count_as_of_write =
5374 hunk_staging_operation_count;
5375 });
5376 Some(())
5377 })
5378 .context("Git store dropped")?;
5379 }
5380 Ok(())
5381 },
5382 )
5383 }
5384
5385 pub fn create_remote(
5386 &mut self,
5387 remote_name: String,
5388 remote_url: String,
5389 ) -> oneshot::Receiver<Result<()>> {
5390 let id = self.id;
5391 self.send_job(
5392 Some(format!("git remote add {remote_name} {remote_url}").into()),
5393 move |repo, _cx| async move {
5394 match repo {
5395 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5396 backend.create_remote(remote_name, remote_url).await
5397 }
5398 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5399 client
5400 .request(proto::GitCreateRemote {
5401 project_id: project_id.0,
5402 repository_id: id.to_proto(),
5403 remote_name,
5404 remote_url,
5405 })
5406 .await?;
5407
5408 Ok(())
5409 }
5410 }
5411 },
5412 )
5413 }
5414
5415 pub fn remove_remote(&mut self, remote_name: String) -> oneshot::Receiver<Result<()>> {
5416 let id = self.id;
5417 self.send_job(
5418 Some(format!("git remove remote {remote_name}").into()),
5419 move |repo, _cx| async move {
5420 match repo {
5421 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5422 backend.remove_remote(remote_name).await
5423 }
5424 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5425 client
5426 .request(proto::GitRemoveRemote {
5427 project_id: project_id.0,
5428 repository_id: id.to_proto(),
5429 remote_name,
5430 })
5431 .await?;
5432
5433 Ok(())
5434 }
5435 }
5436 },
5437 )
5438 }
5439
5440 pub fn get_remotes(
5441 &mut self,
5442 branch_name: Option<String>,
5443 is_push: bool,
5444 ) -> oneshot::Receiver<Result<Vec<Remote>>> {
5445 let id = self.id;
5446 self.send_job(None, move |repo, _cx| async move {
5447 match repo {
5448 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5449 let remote = if let Some(branch_name) = branch_name {
5450 if is_push {
5451 backend.get_push_remote(branch_name).await?
5452 } else {
5453 backend.get_branch_remote(branch_name).await?
5454 }
5455 } else {
5456 None
5457 };
5458
5459 match remote {
5460 Some(remote) => Ok(vec![remote]),
5461 None => backend.get_all_remotes().await,
5462 }
5463 }
5464 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5465 let response = client
5466 .request(proto::GetRemotes {
5467 project_id: project_id.0,
5468 repository_id: id.to_proto(),
5469 branch_name,
5470 is_push,
5471 })
5472 .await?;
5473
5474 let remotes = response
5475 .remotes
5476 .into_iter()
5477 .map(|remotes| Remote {
5478 name: remotes.name.into(),
5479 })
5480 .collect();
5481
5482 Ok(remotes)
5483 }
5484 }
5485 })
5486 }
5487
5488 pub fn branches(&mut self) -> oneshot::Receiver<Result<Vec<Branch>>> {
5489 let id = self.id;
5490 self.send_job(None, move |repo, _| async move {
5491 match repo {
5492 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5493 backend.branches().await
5494 }
5495 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5496 let response = client
5497 .request(proto::GitGetBranches {
5498 project_id: project_id.0,
5499 repository_id: id.to_proto(),
5500 })
5501 .await?;
5502
5503 let branches = response
5504 .branches
5505 .into_iter()
5506 .map(|branch| proto_to_branch(&branch))
5507 .collect();
5508
5509 Ok(branches)
5510 }
5511 }
5512 })
5513 }
5514
5515 pub fn worktrees(&mut self) -> oneshot::Receiver<Result<Vec<GitWorktree>>> {
5516 let id = self.id;
5517 self.send_job(None, move |repo, _| async move {
5518 match repo {
5519 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5520 backend.worktrees().await
5521 }
5522 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5523 let response = client
5524 .request(proto::GitGetWorktrees {
5525 project_id: project_id.0,
5526 repository_id: id.to_proto(),
5527 })
5528 .await?;
5529
5530 let worktrees = response
5531 .worktrees
5532 .into_iter()
5533 .map(|worktree| proto_to_worktree(&worktree))
5534 .collect();
5535
5536 Ok(worktrees)
5537 }
5538 }
5539 })
5540 }
5541
5542 pub fn create_worktree(
5543 &mut self,
5544 name: String,
5545 directory: PathBuf,
5546 commit: Option<String>,
5547 ) -> oneshot::Receiver<Result<()>> {
5548 let id = self.id;
5549 self.send_job(
5550 Some("git worktree add".into()),
5551 move |repo, _cx| async move {
5552 match repo {
5553 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5554 backend.create_worktree(name, directory, commit).await
5555 }
5556 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5557 client
5558 .request(proto::GitCreateWorktree {
5559 project_id: project_id.0,
5560 repository_id: id.to_proto(),
5561 name,
5562 directory: directory.to_string_lossy().to_string(),
5563 commit,
5564 })
5565 .await?;
5566
5567 Ok(())
5568 }
5569 }
5570 },
5571 )
5572 }
5573
5574 pub fn default_branch(
5575 &mut self,
5576 include_remote_name: bool,
5577 ) -> oneshot::Receiver<Result<Option<SharedString>>> {
5578 let id = self.id;
5579 self.send_job(None, move |repo, _| async move {
5580 match repo {
5581 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5582 backend.default_branch(include_remote_name).await
5583 }
5584 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5585 let response = client
5586 .request(proto::GetDefaultBranch {
5587 project_id: project_id.0,
5588 repository_id: id.to_proto(),
5589 })
5590 .await?;
5591
5592 anyhow::Ok(response.branch.map(SharedString::from))
5593 }
5594 }
5595 })
5596 }
5597
5598 pub fn diff_tree(
5599 &mut self,
5600 diff_type: DiffTreeType,
5601 _cx: &App,
5602 ) -> oneshot::Receiver<Result<TreeDiff>> {
5603 let repository_id = self.snapshot.id;
5604 self.send_job(None, move |repo, _cx| async move {
5605 match repo {
5606 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5607 backend.diff_tree(diff_type).await
5608 }
5609 RepositoryState::Remote(RemoteRepositoryState { client, project_id }) => {
5610 let response = client
5611 .request(proto::GetTreeDiff {
5612 project_id: project_id.0,
5613 repository_id: repository_id.0,
5614 is_merge: matches!(diff_type, DiffTreeType::MergeBase { .. }),
5615 base: diff_type.base().to_string(),
5616 head: diff_type.head().to_string(),
5617 })
5618 .await?;
5619
5620 let entries = response
5621 .entries
5622 .into_iter()
5623 .filter_map(|entry| {
5624 let status = match entry.status() {
5625 proto::tree_diff_status::Status::Added => TreeDiffStatus::Added,
5626 proto::tree_diff_status::Status::Modified => {
5627 TreeDiffStatus::Modified {
5628 old: git::Oid::from_str(
5629 &entry.oid.context("missing oid").log_err()?,
5630 )
5631 .log_err()?,
5632 }
5633 }
5634 proto::tree_diff_status::Status::Deleted => {
5635 TreeDiffStatus::Deleted {
5636 old: git::Oid::from_str(
5637 &entry.oid.context("missing oid").log_err()?,
5638 )
5639 .log_err()?,
5640 }
5641 }
5642 };
5643 Some((
5644 RepoPath::from_rel_path(
5645 &RelPath::from_proto(&entry.path).log_err()?,
5646 ),
5647 status,
5648 ))
5649 })
5650 .collect();
5651
5652 Ok(TreeDiff { entries })
5653 }
5654 }
5655 })
5656 }
5657
5658 pub fn diff(&mut self, diff_type: DiffType, _cx: &App) -> oneshot::Receiver<Result<String>> {
5659 let id = self.id;
5660 self.send_job(None, move |repo, _cx| async move {
5661 match repo {
5662 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5663 backend.diff(diff_type).await
5664 }
5665 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5666 let (proto_diff_type, merge_base_ref) = match &diff_type {
5667 DiffType::HeadToIndex => {
5668 (proto::git_diff::DiffType::HeadToIndex.into(), None)
5669 }
5670 DiffType::HeadToWorktree => {
5671 (proto::git_diff::DiffType::HeadToWorktree.into(), None)
5672 }
5673 DiffType::MergeBase { base_ref } => (
5674 proto::git_diff::DiffType::MergeBase.into(),
5675 Some(base_ref.to_string()),
5676 ),
5677 };
5678 let response = client
5679 .request(proto::GitDiff {
5680 project_id: project_id.0,
5681 repository_id: id.to_proto(),
5682 diff_type: proto_diff_type,
5683 merge_base_ref,
5684 })
5685 .await?;
5686
5687 Ok(response.diff)
5688 }
5689 }
5690 })
5691 }
5692
5693 pub fn create_branch(
5694 &mut self,
5695 branch_name: String,
5696 base_branch: Option<String>,
5697 ) -> oneshot::Receiver<Result<()>> {
5698 let id = self.id;
5699 let status_msg = if let Some(ref base) = base_branch {
5700 format!("git switch -c {branch_name} {base}").into()
5701 } else {
5702 format!("git switch -c {branch_name}").into()
5703 };
5704 self.send_job(Some(status_msg), move |repo, _cx| async move {
5705 match repo {
5706 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5707 backend.create_branch(branch_name, base_branch).await
5708 }
5709 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5710 client
5711 .request(proto::GitCreateBranch {
5712 project_id: project_id.0,
5713 repository_id: id.to_proto(),
5714 branch_name,
5715 })
5716 .await?;
5717
5718 Ok(())
5719 }
5720 }
5721 })
5722 }
5723
5724 pub fn change_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
5725 let id = self.id;
5726 self.send_job(
5727 Some(format!("git switch {branch_name}").into()),
5728 move |repo, _cx| async move {
5729 match repo {
5730 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5731 backend.change_branch(branch_name).await
5732 }
5733 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5734 client
5735 .request(proto::GitChangeBranch {
5736 project_id: project_id.0,
5737 repository_id: id.to_proto(),
5738 branch_name,
5739 })
5740 .await?;
5741
5742 Ok(())
5743 }
5744 }
5745 },
5746 )
5747 }
5748
5749 pub fn delete_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
5750 let id = self.id;
5751 self.send_job(
5752 Some(format!("git branch -d {branch_name}").into()),
5753 move |repo, _cx| async move {
5754 match repo {
5755 RepositoryState::Local(state) => state.backend.delete_branch(branch_name).await,
5756 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5757 client
5758 .request(proto::GitDeleteBranch {
5759 project_id: project_id.0,
5760 repository_id: id.to_proto(),
5761 branch_name,
5762 })
5763 .await?;
5764
5765 Ok(())
5766 }
5767 }
5768 },
5769 )
5770 }
5771
5772 pub fn rename_branch(
5773 &mut self,
5774 branch: String,
5775 new_name: String,
5776 ) -> oneshot::Receiver<Result<()>> {
5777 let id = self.id;
5778 self.send_job(
5779 Some(format!("git branch -m {branch} {new_name}").into()),
5780 move |repo, _cx| async move {
5781 match repo {
5782 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5783 backend.rename_branch(branch, new_name).await
5784 }
5785 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5786 client
5787 .request(proto::GitRenameBranch {
5788 project_id: project_id.0,
5789 repository_id: id.to_proto(),
5790 branch,
5791 new_name,
5792 })
5793 .await?;
5794
5795 Ok(())
5796 }
5797 }
5798 },
5799 )
5800 }
5801
5802 pub fn check_for_pushed_commits(&mut self) -> oneshot::Receiver<Result<Vec<SharedString>>> {
5803 let id = self.id;
5804 self.send_job(None, move |repo, _cx| async move {
5805 match repo {
5806 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5807 backend.check_for_pushed_commit().await
5808 }
5809 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5810 let response = client
5811 .request(proto::CheckForPushedCommits {
5812 project_id: project_id.0,
5813 repository_id: id.to_proto(),
5814 })
5815 .await?;
5816
5817 let branches = response.pushed_to.into_iter().map(Into::into).collect();
5818
5819 Ok(branches)
5820 }
5821 }
5822 })
5823 }
5824
5825 pub fn checkpoint(&mut self) -> oneshot::Receiver<Result<GitRepositoryCheckpoint>> {
5826 self.send_job(None, |repo, _cx| async move {
5827 match repo {
5828 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5829 backend.checkpoint().await
5830 }
5831 RepositoryState::Remote(..) => anyhow::bail!("not implemented yet"),
5832 }
5833 })
5834 }
5835
5836 pub fn restore_checkpoint(
5837 &mut self,
5838 checkpoint: GitRepositoryCheckpoint,
5839 ) -> oneshot::Receiver<Result<()>> {
5840 self.send_job(None, move |repo, _cx| async move {
5841 match repo {
5842 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5843 backend.restore_checkpoint(checkpoint).await
5844 }
5845 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
5846 }
5847 })
5848 }
5849
5850 pub(crate) fn apply_remote_update(
5851 &mut self,
5852 update: proto::UpdateRepository,
5853 cx: &mut Context<Self>,
5854 ) -> Result<()> {
5855 let conflicted_paths = TreeSet::from_ordered_entries(
5856 update
5857 .current_merge_conflicts
5858 .into_iter()
5859 .filter_map(|path| RepoPath::from_proto(&path).log_err()),
5860 );
5861 let new_branch = update.branch_summary.as_ref().map(proto_to_branch);
5862 let new_head_commit = update
5863 .head_commit_details
5864 .as_ref()
5865 .map(proto_to_commit_details);
5866 if self.snapshot.branch != new_branch || self.snapshot.head_commit != new_head_commit {
5867 cx.emit(RepositoryEvent::BranchChanged)
5868 }
5869 self.snapshot.branch = new_branch;
5870 self.snapshot.head_commit = new_head_commit;
5871
5872 self.snapshot.merge.conflicted_paths = conflicted_paths;
5873 self.snapshot.merge.message = update.merge_message.map(SharedString::from);
5874 let new_stash_entries = GitStash {
5875 entries: update
5876 .stash_entries
5877 .iter()
5878 .filter_map(|entry| proto_to_stash(entry).ok())
5879 .collect(),
5880 };
5881 if self.snapshot.stash_entries != new_stash_entries {
5882 cx.emit(RepositoryEvent::StashEntriesChanged)
5883 }
5884 self.snapshot.stash_entries = new_stash_entries;
5885 self.snapshot.remote_upstream_url = update.remote_upstream_url;
5886 self.snapshot.remote_origin_url = update.remote_origin_url;
5887
5888 let edits = update
5889 .removed_statuses
5890 .into_iter()
5891 .filter_map(|path| {
5892 Some(sum_tree::Edit::Remove(PathKey(
5893 RelPath::from_proto(&path).log_err()?,
5894 )))
5895 })
5896 .chain(
5897 update
5898 .updated_statuses
5899 .into_iter()
5900 .filter_map(|updated_status| {
5901 Some(sum_tree::Edit::Insert(updated_status.try_into().log_err()?))
5902 }),
5903 )
5904 .collect::<Vec<_>>();
5905 if !edits.is_empty() {
5906 cx.emit(RepositoryEvent::StatusesChanged);
5907 }
5908 self.snapshot.statuses_by_path.edit(edits, ());
5909 if update.is_last_update {
5910 self.snapshot.scan_id = update.scan_id;
5911 }
5912 self.clear_pending_ops(cx);
5913 Ok(())
5914 }
5915
5916 pub fn compare_checkpoints(
5917 &mut self,
5918 left: GitRepositoryCheckpoint,
5919 right: GitRepositoryCheckpoint,
5920 ) -> oneshot::Receiver<Result<bool>> {
5921 self.send_job(None, move |repo, _cx| async move {
5922 match repo {
5923 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5924 backend.compare_checkpoints(left, right).await
5925 }
5926 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
5927 }
5928 })
5929 }
5930
5931 pub fn diff_checkpoints(
5932 &mut self,
5933 base_checkpoint: GitRepositoryCheckpoint,
5934 target_checkpoint: GitRepositoryCheckpoint,
5935 ) -> oneshot::Receiver<Result<String>> {
5936 self.send_job(None, move |repo, _cx| async move {
5937 match repo {
5938 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5939 backend
5940 .diff_checkpoints(base_checkpoint, target_checkpoint)
5941 .await
5942 }
5943 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
5944 }
5945 })
5946 }
5947
5948 fn clear_pending_ops(&mut self, cx: &mut Context<Self>) {
5949 let updated = SumTree::from_iter(
5950 self.pending_ops.iter().filter_map(|ops| {
5951 let inner_ops: Vec<PendingOp> =
5952 ops.ops.iter().filter(|op| op.running()).cloned().collect();
5953 if inner_ops.is_empty() {
5954 None
5955 } else {
5956 Some(PendingOps {
5957 repo_path: ops.repo_path.clone(),
5958 ops: inner_ops,
5959 })
5960 }
5961 }),
5962 (),
5963 );
5964
5965 if updated != self.pending_ops {
5966 cx.emit(RepositoryEvent::PendingOpsChanged {
5967 pending_ops: self.pending_ops.clone(),
5968 })
5969 }
5970
5971 self.pending_ops = updated;
5972 }
5973
5974 fn schedule_scan(
5975 &mut self,
5976 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
5977 cx: &mut Context<Self>,
5978 ) {
5979 let this = cx.weak_entity();
5980 let _ = self.send_keyed_job(
5981 Some(GitJobKey::ReloadGitState),
5982 None,
5983 |state, mut cx| async move {
5984 log::debug!("run scheduled git status scan");
5985
5986 let Some(this) = this.upgrade() else {
5987 return Ok(());
5988 };
5989 let RepositoryState::Local(LocalRepositoryState { backend, .. }) = state else {
5990 bail!("not a local repository")
5991 };
5992 let (snapshot, events) = this
5993 .update(&mut cx, |this, _| {
5994 this.paths_needing_status_update.clear();
5995 compute_snapshot(
5996 this.id,
5997 this.work_directory_abs_path.clone(),
5998 this.snapshot.clone(),
5999 backend.clone(),
6000 )
6001 })
6002 .await?;
6003 this.update(&mut cx, |this, cx| {
6004 this.snapshot = snapshot.clone();
6005 this.clear_pending_ops(cx);
6006 for event in events {
6007 cx.emit(event);
6008 }
6009 });
6010 if let Some(updates_tx) = updates_tx {
6011 updates_tx
6012 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
6013 .ok();
6014 }
6015 Ok(())
6016 },
6017 );
6018 }
6019
6020 fn spawn_local_git_worker(
6021 state: Shared<Task<Result<LocalRepositoryState, String>>>,
6022 cx: &mut Context<Self>,
6023 ) -> mpsc::UnboundedSender<GitJob> {
6024 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
6025
6026 cx.spawn(async move |_, cx| {
6027 let state = state.await.map_err(|err| anyhow::anyhow!(err))?;
6028 if let Some(git_hosting_provider_registry) =
6029 cx.update(|cx| GitHostingProviderRegistry::try_global(cx))
6030 {
6031 git_hosting_providers::register_additional_providers(
6032 git_hosting_provider_registry,
6033 state.backend.clone(),
6034 )
6035 .await;
6036 }
6037 let state = RepositoryState::Local(state);
6038 let mut jobs = VecDeque::new();
6039 loop {
6040 while let Ok(Some(next_job)) = job_rx.try_next() {
6041 jobs.push_back(next_job);
6042 }
6043
6044 if let Some(job) = jobs.pop_front() {
6045 if let Some(current_key) = &job.key
6046 && jobs
6047 .iter()
6048 .any(|other_job| other_job.key.as_ref() == Some(current_key))
6049 {
6050 continue;
6051 }
6052 (job.job)(state.clone(), cx).await;
6053 } else if let Some(job) = job_rx.next().await {
6054 jobs.push_back(job);
6055 } else {
6056 break;
6057 }
6058 }
6059 anyhow::Ok(())
6060 })
6061 .detach_and_log_err(cx);
6062
6063 job_tx
6064 }
6065
6066 fn spawn_remote_git_worker(
6067 state: RemoteRepositoryState,
6068 cx: &mut Context<Self>,
6069 ) -> mpsc::UnboundedSender<GitJob> {
6070 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
6071
6072 cx.spawn(async move |_, cx| {
6073 let state = RepositoryState::Remote(state);
6074 let mut jobs = VecDeque::new();
6075 loop {
6076 while let Ok(Some(next_job)) = job_rx.try_next() {
6077 jobs.push_back(next_job);
6078 }
6079
6080 if let Some(job) = jobs.pop_front() {
6081 if let Some(current_key) = &job.key
6082 && jobs
6083 .iter()
6084 .any(|other_job| other_job.key.as_ref() == Some(current_key))
6085 {
6086 continue;
6087 }
6088 (job.job)(state.clone(), cx).await;
6089 } else if let Some(job) = job_rx.next().await {
6090 jobs.push_back(job);
6091 } else {
6092 break;
6093 }
6094 }
6095 anyhow::Ok(())
6096 })
6097 .detach_and_log_err(cx);
6098
6099 job_tx
6100 }
6101
6102 fn load_staged_text(
6103 &mut self,
6104 buffer_id: BufferId,
6105 repo_path: RepoPath,
6106 cx: &App,
6107 ) -> Task<Result<Option<String>>> {
6108 let rx = self.send_job(None, move |state, _| async move {
6109 match state {
6110 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
6111 anyhow::Ok(backend.load_index_text(repo_path).await)
6112 }
6113 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
6114 let response = client
6115 .request(proto::OpenUnstagedDiff {
6116 project_id: project_id.to_proto(),
6117 buffer_id: buffer_id.to_proto(),
6118 })
6119 .await?;
6120 Ok(response.staged_text)
6121 }
6122 }
6123 });
6124 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
6125 }
6126
6127 fn load_committed_text(
6128 &mut self,
6129 buffer_id: BufferId,
6130 repo_path: RepoPath,
6131 cx: &App,
6132 ) -> Task<Result<DiffBasesChange>> {
6133 let rx = self.send_job(None, move |state, _| async move {
6134 match state {
6135 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
6136 let committed_text = backend.load_committed_text(repo_path.clone()).await;
6137 let staged_text = backend.load_index_text(repo_path).await;
6138 let diff_bases_change = if committed_text == staged_text {
6139 DiffBasesChange::SetBoth(committed_text)
6140 } else {
6141 DiffBasesChange::SetEach {
6142 index: staged_text,
6143 head: committed_text,
6144 }
6145 };
6146 anyhow::Ok(diff_bases_change)
6147 }
6148 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
6149 use proto::open_uncommitted_diff_response::Mode;
6150
6151 let response = client
6152 .request(proto::OpenUncommittedDiff {
6153 project_id: project_id.to_proto(),
6154 buffer_id: buffer_id.to_proto(),
6155 })
6156 .await?;
6157 let mode = Mode::from_i32(response.mode).context("Invalid mode")?;
6158 let bases = match mode {
6159 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
6160 Mode::IndexAndHead => DiffBasesChange::SetEach {
6161 head: response.committed_text,
6162 index: response.staged_text,
6163 },
6164 };
6165 Ok(bases)
6166 }
6167 }
6168 });
6169
6170 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
6171 }
6172
6173 fn load_blob_content(&mut self, oid: Oid, cx: &App) -> Task<Result<String>> {
6174 let repository_id = self.snapshot.id;
6175 let rx = self.send_job(None, move |state, _| async move {
6176 match state {
6177 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
6178 backend.load_blob_content(oid).await
6179 }
6180 RepositoryState::Remote(RemoteRepositoryState { client, project_id }) => {
6181 let response = client
6182 .request(proto::GetBlobContent {
6183 project_id: project_id.to_proto(),
6184 repository_id: repository_id.0,
6185 oid: oid.to_string(),
6186 })
6187 .await?;
6188 Ok(response.content)
6189 }
6190 }
6191 });
6192 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
6193 }
6194
6195 fn paths_changed(
6196 &mut self,
6197 paths: Vec<RepoPath>,
6198 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
6199 cx: &mut Context<Self>,
6200 ) {
6201 if !paths.is_empty() {
6202 self.paths_needing_status_update.push(paths);
6203 }
6204
6205 let this = cx.weak_entity();
6206 let _ = self.send_keyed_job(
6207 Some(GitJobKey::RefreshStatuses),
6208 None,
6209 |state, mut cx| async move {
6210 let (prev_snapshot, changed_paths) = this.update(&mut cx, |this, _| {
6211 (
6212 this.snapshot.clone(),
6213 mem::take(&mut this.paths_needing_status_update),
6214 )
6215 })?;
6216 let RepositoryState::Local(LocalRepositoryState { backend, .. }) = state else {
6217 bail!("not a local repository")
6218 };
6219
6220 if changed_paths.is_empty() {
6221 return Ok(());
6222 }
6223
6224 let stash_entries = backend.stash_entries().await?;
6225 let changed_path_statuses = cx
6226 .background_spawn(async move {
6227 let mut changed_paths =
6228 changed_paths.into_iter().flatten().collect::<BTreeSet<_>>();
6229 let statuses = backend
6230 .status(&changed_paths.iter().cloned().collect::<Vec<_>>())
6231 .await?;
6232 let mut changed_path_statuses = Vec::new();
6233 let prev_statuses = prev_snapshot.statuses_by_path.clone();
6234 let mut cursor = prev_statuses.cursor::<PathProgress>(());
6235
6236 for (repo_path, status) in &*statuses.entries {
6237 changed_paths.remove(repo_path);
6238 if cursor.seek_forward(&PathTarget::Path(repo_path), Bias::Left)
6239 && cursor.item().is_some_and(|entry| entry.status == *status)
6240 {
6241 continue;
6242 }
6243
6244 changed_path_statuses.push(Edit::Insert(StatusEntry {
6245 repo_path: repo_path.clone(),
6246 status: *status,
6247 }));
6248 }
6249 let mut cursor = prev_statuses.cursor::<PathProgress>(());
6250 for path in changed_paths.into_iter() {
6251 if cursor.seek_forward(&PathTarget::Path(&path), Bias::Left) {
6252 changed_path_statuses
6253 .push(Edit::Remove(PathKey(path.as_ref().clone())));
6254 }
6255 }
6256 anyhow::Ok(changed_path_statuses)
6257 })
6258 .await?;
6259
6260 this.update(&mut cx, |this, cx| {
6261 if this.snapshot.stash_entries != stash_entries {
6262 cx.emit(RepositoryEvent::StashEntriesChanged);
6263 this.snapshot.stash_entries = stash_entries;
6264 }
6265
6266 if !changed_path_statuses.is_empty() {
6267 cx.emit(RepositoryEvent::StatusesChanged);
6268 this.snapshot
6269 .statuses_by_path
6270 .edit(changed_path_statuses, ());
6271 this.snapshot.scan_id += 1;
6272 }
6273
6274 if let Some(updates_tx) = updates_tx {
6275 updates_tx
6276 .unbounded_send(DownstreamUpdate::UpdateRepository(
6277 this.snapshot.clone(),
6278 ))
6279 .ok();
6280 }
6281 })
6282 },
6283 );
6284 }
6285
6286 /// currently running git command and when it started
6287 pub fn current_job(&self) -> Option<JobInfo> {
6288 self.active_jobs.values().next().cloned()
6289 }
6290
6291 pub fn barrier(&mut self) -> oneshot::Receiver<()> {
6292 self.send_job(None, |_, _| async {})
6293 }
6294
6295 fn spawn_job_with_tracking<AsyncFn>(
6296 &mut self,
6297 paths: Vec<RepoPath>,
6298 git_status: pending_op::GitStatus,
6299 cx: &mut Context<Self>,
6300 f: AsyncFn,
6301 ) -> Task<Result<()>>
6302 where
6303 AsyncFn: AsyncFnOnce(WeakEntity<Repository>, &mut AsyncApp) -> Result<()> + 'static,
6304 {
6305 let ids = self.new_pending_ops_for_paths(paths, git_status);
6306
6307 cx.spawn(async move |this, cx| {
6308 let (job_status, result) = match f(this.clone(), cx).await {
6309 Ok(()) => (pending_op::JobStatus::Finished, Ok(())),
6310 Err(err) if err.is::<Canceled>() => (pending_op::JobStatus::Skipped, Ok(())),
6311 Err(err) => (pending_op::JobStatus::Error, Err(err)),
6312 };
6313
6314 this.update(cx, |this, _| {
6315 let mut edits = Vec::with_capacity(ids.len());
6316 for (id, entry) in ids {
6317 if let Some(mut ops) = this
6318 .pending_ops
6319 .get(&PathKey(entry.as_ref().clone()), ())
6320 .cloned()
6321 {
6322 if let Some(op) = ops.op_by_id_mut(id) {
6323 op.job_status = job_status;
6324 }
6325 edits.push(sum_tree::Edit::Insert(ops));
6326 }
6327 }
6328 this.pending_ops.edit(edits, ());
6329 })?;
6330
6331 result
6332 })
6333 }
6334
6335 fn new_pending_ops_for_paths(
6336 &mut self,
6337 paths: Vec<RepoPath>,
6338 git_status: pending_op::GitStatus,
6339 ) -> Vec<(PendingOpId, RepoPath)> {
6340 let mut edits = Vec::with_capacity(paths.len());
6341 let mut ids = Vec::with_capacity(paths.len());
6342 for path in paths {
6343 let mut ops = self
6344 .pending_ops
6345 .get(&PathKey(path.as_ref().clone()), ())
6346 .cloned()
6347 .unwrap_or_else(|| PendingOps::new(&path));
6348 let id = ops.max_id() + 1;
6349 ops.ops.push(PendingOp {
6350 id,
6351 git_status,
6352 job_status: pending_op::JobStatus::Running,
6353 });
6354 edits.push(sum_tree::Edit::Insert(ops));
6355 ids.push((id, path));
6356 }
6357 self.pending_ops.edit(edits, ());
6358 ids
6359 }
6360 pub fn default_remote_url(&self) -> Option<String> {
6361 self.remote_upstream_url
6362 .clone()
6363 .or(self.remote_origin_url.clone())
6364 }
6365}
6366
6367fn get_permalink_in_rust_registry_src(
6368 provider_registry: Arc<GitHostingProviderRegistry>,
6369 path: PathBuf,
6370 selection: Range<u32>,
6371) -> Result<url::Url> {
6372 #[derive(Deserialize)]
6373 struct CargoVcsGit {
6374 sha1: String,
6375 }
6376
6377 #[derive(Deserialize)]
6378 struct CargoVcsInfo {
6379 git: CargoVcsGit,
6380 path_in_vcs: String,
6381 }
6382
6383 #[derive(Deserialize)]
6384 struct CargoPackage {
6385 repository: String,
6386 }
6387
6388 #[derive(Deserialize)]
6389 struct CargoToml {
6390 package: CargoPackage,
6391 }
6392
6393 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
6394 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
6395 Some((dir, json))
6396 }) else {
6397 bail!("No .cargo_vcs_info.json found in parent directories")
6398 };
6399 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
6400 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
6401 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
6402 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
6403 .context("parsing package.repository field of manifest")?;
6404 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
6405 let permalink = provider.build_permalink(
6406 remote,
6407 BuildPermalinkParams::new(
6408 &cargo_vcs_info.git.sha1,
6409 &RepoPath::from_rel_path(
6410 &RelPath::new(&path, PathStyle::local()).context("invalid path")?,
6411 ),
6412 Some(selection),
6413 ),
6414 );
6415 Ok(permalink)
6416}
6417
6418fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
6419 let Some(blame) = blame else {
6420 return proto::BlameBufferResponse {
6421 blame_response: None,
6422 };
6423 };
6424
6425 let entries = blame
6426 .entries
6427 .into_iter()
6428 .map(|entry| proto::BlameEntry {
6429 sha: entry.sha.as_bytes().into(),
6430 start_line: entry.range.start,
6431 end_line: entry.range.end,
6432 original_line_number: entry.original_line_number,
6433 author: entry.author,
6434 author_mail: entry.author_mail,
6435 author_time: entry.author_time,
6436 author_tz: entry.author_tz,
6437 committer: entry.committer_name,
6438 committer_mail: entry.committer_email,
6439 committer_time: entry.committer_time,
6440 committer_tz: entry.committer_tz,
6441 summary: entry.summary,
6442 previous: entry.previous,
6443 filename: entry.filename,
6444 })
6445 .collect::<Vec<_>>();
6446
6447 let messages = blame
6448 .messages
6449 .into_iter()
6450 .map(|(oid, message)| proto::CommitMessage {
6451 oid: oid.as_bytes().into(),
6452 message,
6453 })
6454 .collect::<Vec<_>>();
6455
6456 proto::BlameBufferResponse {
6457 blame_response: Some(proto::blame_buffer_response::BlameResponse { entries, messages }),
6458 }
6459}
6460
6461fn deserialize_blame_buffer_response(
6462 response: proto::BlameBufferResponse,
6463) -> Option<git::blame::Blame> {
6464 let response = response.blame_response?;
6465 let entries = response
6466 .entries
6467 .into_iter()
6468 .filter_map(|entry| {
6469 Some(git::blame::BlameEntry {
6470 sha: git::Oid::from_bytes(&entry.sha).ok()?,
6471 range: entry.start_line..entry.end_line,
6472 original_line_number: entry.original_line_number,
6473 committer_name: entry.committer,
6474 committer_time: entry.committer_time,
6475 committer_tz: entry.committer_tz,
6476 committer_email: entry.committer_mail,
6477 author: entry.author,
6478 author_mail: entry.author_mail,
6479 author_time: entry.author_time,
6480 author_tz: entry.author_tz,
6481 summary: entry.summary,
6482 previous: entry.previous,
6483 filename: entry.filename,
6484 })
6485 })
6486 .collect::<Vec<_>>();
6487
6488 let messages = response
6489 .messages
6490 .into_iter()
6491 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
6492 .collect::<HashMap<_, _>>();
6493
6494 Some(Blame { entries, messages })
6495}
6496
6497fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
6498 proto::Branch {
6499 is_head: branch.is_head,
6500 ref_name: branch.ref_name.to_string(),
6501 unix_timestamp: branch
6502 .most_recent_commit
6503 .as_ref()
6504 .map(|commit| commit.commit_timestamp as u64),
6505 upstream: branch.upstream.as_ref().map(|upstream| proto::GitUpstream {
6506 ref_name: upstream.ref_name.to_string(),
6507 tracking: upstream
6508 .tracking
6509 .status()
6510 .map(|upstream| proto::UpstreamTracking {
6511 ahead: upstream.ahead as u64,
6512 behind: upstream.behind as u64,
6513 }),
6514 }),
6515 most_recent_commit: branch
6516 .most_recent_commit
6517 .as_ref()
6518 .map(|commit| proto::CommitSummary {
6519 sha: commit.sha.to_string(),
6520 subject: commit.subject.to_string(),
6521 commit_timestamp: commit.commit_timestamp,
6522 author_name: commit.author_name.to_string(),
6523 }),
6524 }
6525}
6526
6527fn worktree_to_proto(worktree: &git::repository::Worktree) -> proto::Worktree {
6528 proto::Worktree {
6529 path: worktree.path.to_string_lossy().to_string(),
6530 ref_name: worktree.ref_name.to_string(),
6531 sha: worktree.sha.to_string(),
6532 }
6533}
6534
6535fn proto_to_worktree(proto: &proto::Worktree) -> git::repository::Worktree {
6536 git::repository::Worktree {
6537 path: PathBuf::from(proto.path.clone()),
6538 ref_name: proto.ref_name.clone().into(),
6539 sha: proto.sha.clone().into(),
6540 }
6541}
6542
6543fn proto_to_branch(proto: &proto::Branch) -> git::repository::Branch {
6544 git::repository::Branch {
6545 is_head: proto.is_head,
6546 ref_name: proto.ref_name.clone().into(),
6547 upstream: proto
6548 .upstream
6549 .as_ref()
6550 .map(|upstream| git::repository::Upstream {
6551 ref_name: upstream.ref_name.to_string().into(),
6552 tracking: upstream
6553 .tracking
6554 .as_ref()
6555 .map(|tracking| {
6556 git::repository::UpstreamTracking::Tracked(UpstreamTrackingStatus {
6557 ahead: tracking.ahead as u32,
6558 behind: tracking.behind as u32,
6559 })
6560 })
6561 .unwrap_or(git::repository::UpstreamTracking::Gone),
6562 }),
6563 most_recent_commit: proto.most_recent_commit.as_ref().map(|commit| {
6564 git::repository::CommitSummary {
6565 sha: commit.sha.to_string().into(),
6566 subject: commit.subject.to_string().into(),
6567 commit_timestamp: commit.commit_timestamp,
6568 author_name: commit.author_name.to_string().into(),
6569 has_parent: true,
6570 }
6571 }),
6572 }
6573}
6574
6575fn commit_details_to_proto(commit: &CommitDetails) -> proto::GitCommitDetails {
6576 proto::GitCommitDetails {
6577 sha: commit.sha.to_string(),
6578 message: commit.message.to_string(),
6579 commit_timestamp: commit.commit_timestamp,
6580 author_email: commit.author_email.to_string(),
6581 author_name: commit.author_name.to_string(),
6582 }
6583}
6584
6585fn proto_to_commit_details(proto: &proto::GitCommitDetails) -> CommitDetails {
6586 CommitDetails {
6587 sha: proto.sha.clone().into(),
6588 message: proto.message.clone().into(),
6589 commit_timestamp: proto.commit_timestamp,
6590 author_email: proto.author_email.clone().into(),
6591 author_name: proto.author_name.clone().into(),
6592 }
6593}
6594
6595async fn compute_snapshot(
6596 id: RepositoryId,
6597 work_directory_abs_path: Arc<Path>,
6598 prev_snapshot: RepositorySnapshot,
6599 backend: Arc<dyn GitRepository>,
6600) -> Result<(RepositorySnapshot, Vec<RepositoryEvent>)> {
6601 let mut events = Vec::new();
6602 let branches = backend.branches().await?;
6603 let branch = branches.into_iter().find(|branch| branch.is_head);
6604 let statuses = backend
6605 .status(&[RepoPath::from_rel_path(
6606 &RelPath::new(".".as_ref(), PathStyle::local()).unwrap(),
6607 )])
6608 .await?;
6609 let stash_entries = backend.stash_entries().await?;
6610 let statuses_by_path = SumTree::from_iter(
6611 statuses
6612 .entries
6613 .iter()
6614 .map(|(repo_path, status)| StatusEntry {
6615 repo_path: repo_path.clone(),
6616 status: *status,
6617 }),
6618 (),
6619 );
6620 let (merge_details, merge_heads_changed) =
6621 MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?;
6622 log::debug!("new merge details (changed={merge_heads_changed:?}): {merge_details:?}");
6623
6624 if merge_heads_changed {
6625 events.push(RepositoryEvent::MergeHeadsChanged);
6626 }
6627
6628 if statuses_by_path != prev_snapshot.statuses_by_path {
6629 events.push(RepositoryEvent::StatusesChanged)
6630 }
6631
6632 // Useful when branch is None in detached head state
6633 let head_commit = match backend.head_sha().await {
6634 Some(head_sha) => backend.show(head_sha).await.log_err(),
6635 None => None,
6636 };
6637
6638 if branch != prev_snapshot.branch || head_commit != prev_snapshot.head_commit {
6639 events.push(RepositoryEvent::BranchChanged);
6640 }
6641
6642 let remote_origin_url = backend.remote_url("origin").await;
6643 let remote_upstream_url = backend.remote_url("upstream").await;
6644
6645 let snapshot = RepositorySnapshot {
6646 id,
6647 statuses_by_path,
6648 work_directory_abs_path,
6649 path_style: prev_snapshot.path_style,
6650 scan_id: prev_snapshot.scan_id + 1,
6651 branch,
6652 head_commit,
6653 merge: merge_details,
6654 remote_origin_url,
6655 remote_upstream_url,
6656 stash_entries,
6657 };
6658
6659 Ok((snapshot, events))
6660}
6661
6662fn status_from_proto(
6663 simple_status: i32,
6664 status: Option<proto::GitFileStatus>,
6665) -> anyhow::Result<FileStatus> {
6666 use proto::git_file_status::Variant;
6667
6668 let Some(variant) = status.and_then(|status| status.variant) else {
6669 let code = proto::GitStatus::from_i32(simple_status)
6670 .with_context(|| format!("Invalid git status code: {simple_status}"))?;
6671 let result = match code {
6672 proto::GitStatus::Added => TrackedStatus {
6673 worktree_status: StatusCode::Added,
6674 index_status: StatusCode::Unmodified,
6675 }
6676 .into(),
6677 proto::GitStatus::Modified => TrackedStatus {
6678 worktree_status: StatusCode::Modified,
6679 index_status: StatusCode::Unmodified,
6680 }
6681 .into(),
6682 proto::GitStatus::Conflict => UnmergedStatus {
6683 first_head: UnmergedStatusCode::Updated,
6684 second_head: UnmergedStatusCode::Updated,
6685 }
6686 .into(),
6687 proto::GitStatus::Deleted => TrackedStatus {
6688 worktree_status: StatusCode::Deleted,
6689 index_status: StatusCode::Unmodified,
6690 }
6691 .into(),
6692 _ => anyhow::bail!("Invalid code for simple status: {simple_status}"),
6693 };
6694 return Ok(result);
6695 };
6696
6697 let result = match variant {
6698 Variant::Untracked(_) => FileStatus::Untracked,
6699 Variant::Ignored(_) => FileStatus::Ignored,
6700 Variant::Unmerged(unmerged) => {
6701 let [first_head, second_head] =
6702 [unmerged.first_head, unmerged.second_head].map(|head| {
6703 let code = proto::GitStatus::from_i32(head)
6704 .with_context(|| format!("Invalid git status code: {head}"))?;
6705 let result = match code {
6706 proto::GitStatus::Added => UnmergedStatusCode::Added,
6707 proto::GitStatus::Updated => UnmergedStatusCode::Updated,
6708 proto::GitStatus::Deleted => UnmergedStatusCode::Deleted,
6709 _ => anyhow::bail!("Invalid code for unmerged status: {code:?}"),
6710 };
6711 Ok(result)
6712 });
6713 let [first_head, second_head] = [first_head?, second_head?];
6714 UnmergedStatus {
6715 first_head,
6716 second_head,
6717 }
6718 .into()
6719 }
6720 Variant::Tracked(tracked) => {
6721 let [index_status, worktree_status] = [tracked.index_status, tracked.worktree_status]
6722 .map(|status| {
6723 let code = proto::GitStatus::from_i32(status)
6724 .with_context(|| format!("Invalid git status code: {status}"))?;
6725 let result = match code {
6726 proto::GitStatus::Modified => StatusCode::Modified,
6727 proto::GitStatus::TypeChanged => StatusCode::TypeChanged,
6728 proto::GitStatus::Added => StatusCode::Added,
6729 proto::GitStatus::Deleted => StatusCode::Deleted,
6730 proto::GitStatus::Renamed => StatusCode::Renamed,
6731 proto::GitStatus::Copied => StatusCode::Copied,
6732 proto::GitStatus::Unmodified => StatusCode::Unmodified,
6733 _ => anyhow::bail!("Invalid code for tracked status: {code:?}"),
6734 };
6735 Ok(result)
6736 });
6737 let [index_status, worktree_status] = [index_status?, worktree_status?];
6738 TrackedStatus {
6739 index_status,
6740 worktree_status,
6741 }
6742 .into()
6743 }
6744 };
6745 Ok(result)
6746}
6747
6748fn status_to_proto(status: FileStatus) -> proto::GitFileStatus {
6749 use proto::git_file_status::{Tracked, Unmerged, Variant};
6750
6751 let variant = match status {
6752 FileStatus::Untracked => Variant::Untracked(Default::default()),
6753 FileStatus::Ignored => Variant::Ignored(Default::default()),
6754 FileStatus::Unmerged(UnmergedStatus {
6755 first_head,
6756 second_head,
6757 }) => Variant::Unmerged(Unmerged {
6758 first_head: unmerged_status_to_proto(first_head),
6759 second_head: unmerged_status_to_proto(second_head),
6760 }),
6761 FileStatus::Tracked(TrackedStatus {
6762 index_status,
6763 worktree_status,
6764 }) => Variant::Tracked(Tracked {
6765 index_status: tracked_status_to_proto(index_status),
6766 worktree_status: tracked_status_to_proto(worktree_status),
6767 }),
6768 };
6769 proto::GitFileStatus {
6770 variant: Some(variant),
6771 }
6772}
6773
6774fn unmerged_status_to_proto(code: UnmergedStatusCode) -> i32 {
6775 match code {
6776 UnmergedStatusCode::Added => proto::GitStatus::Added as _,
6777 UnmergedStatusCode::Deleted => proto::GitStatus::Deleted as _,
6778 UnmergedStatusCode::Updated => proto::GitStatus::Updated as _,
6779 }
6780}
6781
6782fn tracked_status_to_proto(code: StatusCode) -> i32 {
6783 match code {
6784 StatusCode::Added => proto::GitStatus::Added as _,
6785 StatusCode::Deleted => proto::GitStatus::Deleted as _,
6786 StatusCode::Modified => proto::GitStatus::Modified as _,
6787 StatusCode::Renamed => proto::GitStatus::Renamed as _,
6788 StatusCode::TypeChanged => proto::GitStatus::TypeChanged as _,
6789 StatusCode::Copied => proto::GitStatus::Copied as _,
6790 StatusCode::Unmodified => proto::GitStatus::Unmodified as _,
6791 }
6792}