1pub mod branch_diff;
2mod conflict_set;
3pub mod git_traversal;
4pub mod pending_op;
5
6use crate::{
7 ProjectEnvironment, ProjectItem, ProjectPath,
8 buffer_store::{BufferStore, BufferStoreEvent},
9 worktree_store::{WorktreeStore, WorktreeStoreEvent},
10};
11use anyhow::{Context as _, Result, anyhow, bail};
12use askpass::{AskPassDelegate, EncryptedPassword, IKnowWhatIAmDoingAndIHaveReadTheDocs};
13use buffer_diff::{BufferDiff, BufferDiffEvent};
14use client::ProjectId;
15use collections::HashMap;
16pub use conflict_set::{ConflictRegion, ConflictSet, ConflictSetSnapshot, ConflictSetUpdate};
17use fs::Fs;
18use futures::{
19 FutureExt, StreamExt,
20 channel::{
21 mpsc,
22 oneshot::{self, Canceled},
23 },
24 future::{self, Shared},
25 stream::FuturesOrdered,
26};
27use git::{
28 BuildPermalinkParams, GitHostingProviderRegistry, Oid, RunHook,
29 blame::Blame,
30 parse_git_remote_url,
31 repository::{
32 Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, DiffType, FetchOptions,
33 GitRepository, GitRepositoryCheckpoint, GraphCommitData, InitialGraphCommitData, LogOrder,
34 LogSource, PushOptions, Remote, RemoteCommandOutput, RepoPath, ResetMode,
35 UpstreamTrackingStatus, Worktree as GitWorktree,
36 },
37 stash::{GitStash, StashEntry},
38 status::{
39 DiffTreeType, FileStatus, GitSummary, StatusCode, TrackedStatus, TreeDiff, TreeDiffStatus,
40 UnmergedStatus, UnmergedStatusCode,
41 },
42};
43use gpui::{
44 App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
45 WeakEntity,
46};
47use language::{
48 Buffer, BufferEvent, Language, LanguageRegistry,
49 proto::{deserialize_version, serialize_version},
50};
51use parking_lot::Mutex;
52use pending_op::{PendingOp, PendingOpId, PendingOps, PendingOpsSummary};
53use postage::stream::Stream as _;
54use rpc::{
55 AnyProtoClient, TypedEnvelope,
56 proto::{self, git_reset, split_repository_update},
57};
58use serde::Deserialize;
59use settings::WorktreeId;
60use smol::future::yield_now;
61use std::{
62 cmp::Ordering,
63 collections::{BTreeSet, HashSet, VecDeque},
64 future::Future,
65 mem,
66 ops::Range,
67 path::{Path, PathBuf},
68 str::FromStr,
69 sync::{
70 Arc,
71 atomic::{self, AtomicU64},
72 },
73 time::Instant,
74};
75use sum_tree::{Edit, SumTree, TreeSet};
76use task::Shell;
77use text::{Bias, BufferId};
78use util::{
79 ResultExt, debug_panic,
80 paths::{PathStyle, SanitizedPath},
81 post_inc,
82 rel_path::RelPath,
83};
84use worktree::{
85 File, PathChange, PathKey, PathProgress, PathSummary, PathTarget, ProjectEntryId,
86 UpdatedGitRepositoriesSet, UpdatedGitRepository, Worktree,
87};
88use zeroize::Zeroize;
89
90pub struct GitStore {
91 state: GitStoreState,
92 buffer_store: Entity<BufferStore>,
93 worktree_store: Entity<WorktreeStore>,
94 repositories: HashMap<RepositoryId, Entity<Repository>>,
95 worktree_ids: HashMap<RepositoryId, HashSet<WorktreeId>>,
96 active_repo_id: Option<RepositoryId>,
97 #[allow(clippy::type_complexity)]
98 loading_diffs:
99 HashMap<(BufferId, DiffKind), Shared<Task<Result<Entity<BufferDiff>, Arc<anyhow::Error>>>>>,
100 diffs: HashMap<BufferId, Entity<BufferGitState>>,
101 shared_diffs: HashMap<proto::PeerId, HashMap<BufferId, SharedDiffs>>,
102 _subscriptions: Vec<Subscription>,
103}
104
105#[derive(Default)]
106struct SharedDiffs {
107 unstaged: Option<Entity<BufferDiff>>,
108 uncommitted: Option<Entity<BufferDiff>>,
109}
110
111struct BufferGitState {
112 unstaged_diff: Option<WeakEntity<BufferDiff>>,
113 uncommitted_diff: Option<WeakEntity<BufferDiff>>,
114 oid_diffs: HashMap<Option<git::Oid>, WeakEntity<BufferDiff>>,
115 conflict_set: Option<WeakEntity<ConflictSet>>,
116 recalculate_diff_task: Option<Task<Result<()>>>,
117 reparse_conflict_markers_task: Option<Task<Result<()>>>,
118 language: Option<Arc<Language>>,
119 language_registry: Option<Arc<LanguageRegistry>>,
120 conflict_updated_futures: Vec<oneshot::Sender<()>>,
121 recalculating_tx: postage::watch::Sender<bool>,
122
123 /// These operation counts are used to ensure that head and index text
124 /// values read from the git repository are up-to-date with any hunk staging
125 /// operations that have been performed on the BufferDiff.
126 ///
127 /// The operation count is incremented immediately when the user initiates a
128 /// hunk stage/unstage operation. Then, upon finishing writing the new index
129 /// text do disk, the `operation count as of write` is updated to reflect
130 /// the operation count that prompted the write.
131 hunk_staging_operation_count: usize,
132 hunk_staging_operation_count_as_of_write: usize,
133
134 head_text: Option<Arc<str>>,
135 index_text: Option<Arc<str>>,
136 oid_texts: HashMap<git::Oid, Arc<str>>,
137 head_changed: bool,
138 index_changed: bool,
139 language_changed: bool,
140}
141
142#[derive(Clone, Debug)]
143enum DiffBasesChange {
144 SetIndex(Option<String>),
145 SetHead(Option<String>),
146 SetEach {
147 index: Option<String>,
148 head: Option<String>,
149 },
150 SetBoth(Option<String>),
151}
152
153#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
154enum DiffKind {
155 Unstaged,
156 Uncommitted,
157 SinceOid(Option<git::Oid>),
158}
159
160enum GitStoreState {
161 Local {
162 next_repository_id: Arc<AtomicU64>,
163 downstream: Option<LocalDownstreamState>,
164 project_environment: Entity<ProjectEnvironment>,
165 fs: Arc<dyn Fs>,
166 },
167 Remote {
168 upstream_client: AnyProtoClient,
169 upstream_project_id: u64,
170 downstream: Option<(AnyProtoClient, ProjectId)>,
171 },
172}
173
174enum DownstreamUpdate {
175 UpdateRepository(RepositorySnapshot),
176 RemoveRepository(RepositoryId),
177}
178
179struct LocalDownstreamState {
180 client: AnyProtoClient,
181 project_id: ProjectId,
182 updates_tx: mpsc::UnboundedSender<DownstreamUpdate>,
183 _task: Task<Result<()>>,
184}
185
186#[derive(Clone, Debug)]
187pub struct GitStoreCheckpoint {
188 checkpoints_by_work_dir_abs_path: HashMap<Arc<Path>, GitRepositoryCheckpoint>,
189}
190
191#[derive(Clone, Debug, PartialEq, Eq)]
192pub struct StatusEntry {
193 pub repo_path: RepoPath,
194 pub status: FileStatus,
195}
196
197impl StatusEntry {
198 fn to_proto(&self) -> proto::StatusEntry {
199 let simple_status = match self.status {
200 FileStatus::Ignored | FileStatus::Untracked => proto::GitStatus::Added as i32,
201 FileStatus::Unmerged { .. } => proto::GitStatus::Conflict as i32,
202 FileStatus::Tracked(TrackedStatus {
203 index_status,
204 worktree_status,
205 }) => tracked_status_to_proto(if worktree_status != StatusCode::Unmodified {
206 worktree_status
207 } else {
208 index_status
209 }),
210 };
211
212 proto::StatusEntry {
213 repo_path: self.repo_path.to_proto(),
214 simple_status,
215 status: Some(status_to_proto(self.status)),
216 }
217 }
218}
219
220impl TryFrom<proto::StatusEntry> for StatusEntry {
221 type Error = anyhow::Error;
222
223 fn try_from(value: proto::StatusEntry) -> Result<Self, Self::Error> {
224 let repo_path = RepoPath::from_proto(&value.repo_path).context("invalid repo path")?;
225 let status = status_from_proto(value.simple_status, value.status)?;
226 Ok(Self { repo_path, status })
227 }
228}
229
230impl sum_tree::Item for StatusEntry {
231 type Summary = PathSummary<GitSummary>;
232
233 fn summary(&self, _: <Self::Summary as sum_tree::Summary>::Context<'_>) -> Self::Summary {
234 PathSummary {
235 max_path: self.repo_path.as_ref().clone(),
236 item_summary: self.status.summary(),
237 }
238 }
239}
240
241impl sum_tree::KeyedItem for StatusEntry {
242 type Key = PathKey;
243
244 fn key(&self) -> Self::Key {
245 PathKey(self.repo_path.as_ref().clone())
246 }
247}
248
249#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
250pub struct RepositoryId(pub u64);
251
252#[derive(Clone, Debug, Default, PartialEq, Eq)]
253pub struct MergeDetails {
254 pub conflicted_paths: TreeSet<RepoPath>,
255 pub message: Option<SharedString>,
256 pub heads: Vec<Option<SharedString>>,
257}
258
259#[derive(Clone)]
260pub enum CommitDataState {
261 Loading,
262 Loaded(Arc<GraphCommitData>),
263}
264
265#[derive(Clone, Debug, PartialEq, Eq)]
266pub struct RepositorySnapshot {
267 pub id: RepositoryId,
268 pub statuses_by_path: SumTree<StatusEntry>,
269 pub work_directory_abs_path: Arc<Path>,
270 pub path_style: PathStyle,
271 pub branch: Option<Branch>,
272 pub head_commit: Option<CommitDetails>,
273 pub scan_id: u64,
274 pub merge: MergeDetails,
275 pub remote_origin_url: Option<String>,
276 pub remote_upstream_url: Option<String>,
277 pub stash_entries: GitStash,
278}
279
280type JobId = u64;
281
282#[derive(Clone, Debug, PartialEq, Eq)]
283pub struct JobInfo {
284 pub start: Instant,
285 pub message: SharedString,
286}
287
288struct GraphCommitDataHandler {
289 _task: Task<()>,
290 commit_data_request: smol::channel::Sender<Oid>,
291}
292
293enum GraphCommitHandlerState {
294 Starting,
295 Open(GraphCommitDataHandler),
296 Closed,
297}
298
299pub struct Repository {
300 this: WeakEntity<Self>,
301 snapshot: RepositorySnapshot,
302 commit_message_buffer: Option<Entity<Buffer>>,
303 git_store: WeakEntity<GitStore>,
304 // For a local repository, holds paths that have had worktree events since the last status scan completed,
305 // and that should be examined during the next status scan.
306 paths_needing_status_update: BTreeSet<RepoPath>,
307 job_sender: mpsc::UnboundedSender<GitJob>,
308 active_jobs: HashMap<JobId, JobInfo>,
309 pending_ops: SumTree<PendingOps>,
310 job_id: JobId,
311 askpass_delegates: Arc<Mutex<HashMap<u64, AskPassDelegate>>>,
312 latest_askpass_id: u64,
313 repository_state: Shared<Task<Result<RepositoryState, String>>>,
314 pub initial_graph_data: HashMap<
315 (LogOrder, LogSource),
316 (
317 Task<Result<(), SharedString>>,
318 Vec<Arc<InitialGraphCommitData>>,
319 ),
320 >,
321 graph_commit_data_handler: GraphCommitHandlerState,
322 commit_data: HashMap<Oid, CommitDataState>,
323}
324
325impl std::ops::Deref for Repository {
326 type Target = RepositorySnapshot;
327
328 fn deref(&self) -> &Self::Target {
329 &self.snapshot
330 }
331}
332
333#[derive(Clone)]
334pub struct LocalRepositoryState {
335 pub fs: Arc<dyn Fs>,
336 pub backend: Arc<dyn GitRepository>,
337 pub environment: Arc<HashMap<String, String>>,
338}
339
340impl LocalRepositoryState {
341 async fn new(
342 work_directory_abs_path: Arc<Path>,
343 dot_git_abs_path: Arc<Path>,
344 project_environment: WeakEntity<ProjectEnvironment>,
345 fs: Arc<dyn Fs>,
346 cx: &mut AsyncApp,
347 ) -> anyhow::Result<Self> {
348 let environment = project_environment
349 .update(cx, |project_environment, cx| {
350 project_environment.local_directory_environment(&Shell::System, work_directory_abs_path.clone(), cx)
351 })?
352 .await
353 .unwrap_or_else(|| {
354 log::error!("failed to get working directory environment for repository {work_directory_abs_path:?}");
355 HashMap::default()
356 });
357 let search_paths = environment.get("PATH").map(|val| val.to_owned());
358 let backend = cx
359 .background_spawn({
360 let fs = fs.clone();
361 async move {
362 let system_git_binary_path = search_paths
363 .and_then(|search_paths| {
364 which::which_in("git", Some(search_paths), &work_directory_abs_path)
365 .ok()
366 })
367 .or_else(|| which::which("git").ok());
368 fs.open_repo(&dot_git_abs_path, system_git_binary_path.as_deref())
369 .with_context(|| format!("opening repository at {dot_git_abs_path:?}"))
370 }
371 })
372 .await?;
373 Ok(LocalRepositoryState {
374 backend,
375 environment: Arc::new(environment),
376 fs,
377 })
378 }
379}
380
381#[derive(Clone)]
382pub struct RemoteRepositoryState {
383 pub project_id: ProjectId,
384 pub client: AnyProtoClient,
385}
386
387#[derive(Clone)]
388pub enum RepositoryState {
389 Local(LocalRepositoryState),
390 Remote(RemoteRepositoryState),
391}
392
393#[derive(Clone, Debug, PartialEq, Eq)]
394pub enum RepositoryEvent {
395 StatusesChanged,
396 MergeHeadsChanged,
397 BranchChanged,
398 StashEntriesChanged,
399 PendingOpsChanged { pending_ops: SumTree<PendingOps> },
400 GitGraphCountUpdated((LogOrder, LogSource), usize),
401}
402
403#[derive(Clone, Debug)]
404pub struct JobsUpdated;
405
406#[derive(Debug)]
407pub enum GitStoreEvent {
408 ActiveRepositoryChanged(Option<RepositoryId>),
409 /// Bool is true when the repository that's updated is the active repository
410 RepositoryUpdated(RepositoryId, RepositoryEvent, bool),
411 RepositoryAdded,
412 RepositoryRemoved(RepositoryId),
413 IndexWriteError(anyhow::Error),
414 JobsUpdated,
415 ConflictsUpdated,
416}
417
418impl EventEmitter<RepositoryEvent> for Repository {}
419impl EventEmitter<JobsUpdated> for Repository {}
420impl EventEmitter<GitStoreEvent> for GitStore {}
421
422pub struct GitJob {
423 job: Box<dyn FnOnce(RepositoryState, &mut AsyncApp) -> Task<()>>,
424 key: Option<GitJobKey>,
425}
426
427#[derive(PartialEq, Eq)]
428enum GitJobKey {
429 WriteIndex(Vec<RepoPath>),
430 ReloadBufferDiffBases,
431 RefreshStatuses,
432 ReloadGitState,
433}
434
435impl GitStore {
436 pub fn local(
437 worktree_store: &Entity<WorktreeStore>,
438 buffer_store: Entity<BufferStore>,
439 environment: Entity<ProjectEnvironment>,
440 fs: Arc<dyn Fs>,
441 cx: &mut Context<Self>,
442 ) -> Self {
443 Self::new(
444 worktree_store.clone(),
445 buffer_store,
446 GitStoreState::Local {
447 next_repository_id: Arc::new(AtomicU64::new(1)),
448 downstream: None,
449 project_environment: environment,
450 fs,
451 },
452 cx,
453 )
454 }
455
456 pub fn remote(
457 worktree_store: &Entity<WorktreeStore>,
458 buffer_store: Entity<BufferStore>,
459 upstream_client: AnyProtoClient,
460 project_id: u64,
461 cx: &mut Context<Self>,
462 ) -> Self {
463 Self::new(
464 worktree_store.clone(),
465 buffer_store,
466 GitStoreState::Remote {
467 upstream_client,
468 upstream_project_id: project_id,
469 downstream: None,
470 },
471 cx,
472 )
473 }
474
475 fn new(
476 worktree_store: Entity<WorktreeStore>,
477 buffer_store: Entity<BufferStore>,
478 state: GitStoreState,
479 cx: &mut Context<Self>,
480 ) -> Self {
481 let _subscriptions = vec![
482 cx.subscribe(&worktree_store, Self::on_worktree_store_event),
483 cx.subscribe(&buffer_store, Self::on_buffer_store_event),
484 ];
485
486 GitStore {
487 state,
488 buffer_store,
489 worktree_store,
490 repositories: HashMap::default(),
491 worktree_ids: HashMap::default(),
492 active_repo_id: None,
493 _subscriptions,
494 loading_diffs: HashMap::default(),
495 shared_diffs: HashMap::default(),
496 diffs: HashMap::default(),
497 }
498 }
499
500 pub fn init(client: &AnyProtoClient) {
501 client.add_entity_request_handler(Self::handle_get_remotes);
502 client.add_entity_request_handler(Self::handle_get_branches);
503 client.add_entity_request_handler(Self::handle_get_default_branch);
504 client.add_entity_request_handler(Self::handle_change_branch);
505 client.add_entity_request_handler(Self::handle_create_branch);
506 client.add_entity_request_handler(Self::handle_rename_branch);
507 client.add_entity_request_handler(Self::handle_create_remote);
508 client.add_entity_request_handler(Self::handle_remove_remote);
509 client.add_entity_request_handler(Self::handle_delete_branch);
510 client.add_entity_request_handler(Self::handle_git_init);
511 client.add_entity_request_handler(Self::handle_push);
512 client.add_entity_request_handler(Self::handle_pull);
513 client.add_entity_request_handler(Self::handle_fetch);
514 client.add_entity_request_handler(Self::handle_stage);
515 client.add_entity_request_handler(Self::handle_unstage);
516 client.add_entity_request_handler(Self::handle_stash);
517 client.add_entity_request_handler(Self::handle_stash_pop);
518 client.add_entity_request_handler(Self::handle_stash_apply);
519 client.add_entity_request_handler(Self::handle_stash_drop);
520 client.add_entity_request_handler(Self::handle_commit);
521 client.add_entity_request_handler(Self::handle_run_hook);
522 client.add_entity_request_handler(Self::handle_reset);
523 client.add_entity_request_handler(Self::handle_show);
524 client.add_entity_request_handler(Self::handle_load_commit_diff);
525 client.add_entity_request_handler(Self::handle_file_history);
526 client.add_entity_request_handler(Self::handle_checkout_files);
527 client.add_entity_request_handler(Self::handle_open_commit_message_buffer);
528 client.add_entity_request_handler(Self::handle_set_index_text);
529 client.add_entity_request_handler(Self::handle_askpass);
530 client.add_entity_request_handler(Self::handle_check_for_pushed_commits);
531 client.add_entity_request_handler(Self::handle_git_diff);
532 client.add_entity_request_handler(Self::handle_tree_diff);
533 client.add_entity_request_handler(Self::handle_get_blob_content);
534 client.add_entity_request_handler(Self::handle_open_unstaged_diff);
535 client.add_entity_request_handler(Self::handle_open_uncommitted_diff);
536 client.add_entity_message_handler(Self::handle_update_diff_bases);
537 client.add_entity_request_handler(Self::handle_get_permalink_to_line);
538 client.add_entity_request_handler(Self::handle_blame_buffer);
539 client.add_entity_message_handler(Self::handle_update_repository);
540 client.add_entity_message_handler(Self::handle_remove_repository);
541 client.add_entity_request_handler(Self::handle_git_clone);
542 client.add_entity_request_handler(Self::handle_get_worktrees);
543 client.add_entity_request_handler(Self::handle_create_worktree);
544 }
545
546 pub fn is_local(&self) -> bool {
547 matches!(self.state, GitStoreState::Local { .. })
548 }
549 pub fn set_active_repo_for_path(&mut self, project_path: &ProjectPath, cx: &mut Context<Self>) {
550 if let Some((repo, _)) = self.repository_and_path_for_project_path(project_path, cx) {
551 let id = repo.read(cx).id;
552 if self.active_repo_id != Some(id) {
553 self.active_repo_id = Some(id);
554 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
555 }
556 }
557 }
558
559 pub fn shared(&mut self, project_id: u64, client: AnyProtoClient, cx: &mut Context<Self>) {
560 match &mut self.state {
561 GitStoreState::Remote {
562 downstream: downstream_client,
563 ..
564 } => {
565 for repo in self.repositories.values() {
566 let update = repo.read(cx).snapshot.initial_update(project_id);
567 for update in split_repository_update(update) {
568 client.send(update).log_err();
569 }
570 }
571 *downstream_client = Some((client, ProjectId(project_id)));
572 }
573 GitStoreState::Local {
574 downstream: downstream_client,
575 ..
576 } => {
577 let mut snapshots = HashMap::default();
578 let (updates_tx, mut updates_rx) = mpsc::unbounded();
579 for repo in self.repositories.values() {
580 updates_tx
581 .unbounded_send(DownstreamUpdate::UpdateRepository(
582 repo.read(cx).snapshot.clone(),
583 ))
584 .ok();
585 }
586 *downstream_client = Some(LocalDownstreamState {
587 client: client.clone(),
588 project_id: ProjectId(project_id),
589 updates_tx,
590 _task: cx.spawn(async move |this, cx| {
591 cx.background_spawn(async move {
592 while let Some(update) = updates_rx.next().await {
593 match update {
594 DownstreamUpdate::UpdateRepository(snapshot) => {
595 if let Some(old_snapshot) = snapshots.get_mut(&snapshot.id)
596 {
597 let update =
598 snapshot.build_update(old_snapshot, project_id);
599 *old_snapshot = snapshot;
600 for update in split_repository_update(update) {
601 client.send(update)?;
602 }
603 } else {
604 let update = snapshot.initial_update(project_id);
605 for update in split_repository_update(update) {
606 client.send(update)?;
607 }
608 snapshots.insert(snapshot.id, snapshot);
609 }
610 }
611 DownstreamUpdate::RemoveRepository(id) => {
612 client.send(proto::RemoveRepository {
613 project_id,
614 id: id.to_proto(),
615 })?;
616 }
617 }
618 }
619 anyhow::Ok(())
620 })
621 .await
622 .ok();
623 this.update(cx, |this, _| {
624 if let GitStoreState::Local {
625 downstream: downstream_client,
626 ..
627 } = &mut this.state
628 {
629 downstream_client.take();
630 } else {
631 unreachable!("unshared called on remote store");
632 }
633 })
634 }),
635 });
636 }
637 }
638 }
639
640 pub fn unshared(&mut self, _cx: &mut Context<Self>) {
641 match &mut self.state {
642 GitStoreState::Local {
643 downstream: downstream_client,
644 ..
645 } => {
646 downstream_client.take();
647 }
648 GitStoreState::Remote {
649 downstream: downstream_client,
650 ..
651 } => {
652 downstream_client.take();
653 }
654 }
655 self.shared_diffs.clear();
656 }
657
658 pub(crate) fn forget_shared_diffs_for(&mut self, peer_id: &proto::PeerId) {
659 self.shared_diffs.remove(peer_id);
660 }
661
662 pub fn active_repository(&self) -> Option<Entity<Repository>> {
663 self.active_repo_id
664 .as_ref()
665 .map(|id| self.repositories[id].clone())
666 }
667
668 pub fn open_unstaged_diff(
669 &mut self,
670 buffer: Entity<Buffer>,
671 cx: &mut Context<Self>,
672 ) -> Task<Result<Entity<BufferDiff>>> {
673 let buffer_id = buffer.read(cx).remote_id();
674 if let Some(diff_state) = self.diffs.get(&buffer_id)
675 && let Some(unstaged_diff) = diff_state
676 .read(cx)
677 .unstaged_diff
678 .as_ref()
679 .and_then(|weak| weak.upgrade())
680 {
681 if let Some(task) =
682 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
683 {
684 return cx.background_executor().spawn(async move {
685 task.await;
686 Ok(unstaged_diff)
687 });
688 }
689 return Task::ready(Ok(unstaged_diff));
690 }
691
692 let Some((repo, repo_path)) =
693 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
694 else {
695 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
696 };
697
698 let task = self
699 .loading_diffs
700 .entry((buffer_id, DiffKind::Unstaged))
701 .or_insert_with(|| {
702 let staged_text = repo.update(cx, |repo, cx| {
703 repo.load_staged_text(buffer_id, repo_path, cx)
704 });
705 cx.spawn(async move |this, cx| {
706 Self::open_diff_internal(
707 this,
708 DiffKind::Unstaged,
709 staged_text.await.map(DiffBasesChange::SetIndex),
710 buffer,
711 cx,
712 )
713 .await
714 .map_err(Arc::new)
715 })
716 .shared()
717 })
718 .clone();
719
720 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
721 }
722
723 pub fn open_diff_since(
724 &mut self,
725 oid: Option<git::Oid>,
726 buffer: Entity<Buffer>,
727 repo: Entity<Repository>,
728 cx: &mut Context<Self>,
729 ) -> Task<Result<Entity<BufferDiff>>> {
730 let buffer_id = buffer.read(cx).remote_id();
731
732 if let Some(diff_state) = self.diffs.get(&buffer_id)
733 && let Some(oid_diff) = diff_state.read(cx).oid_diff(oid)
734 {
735 if let Some(task) =
736 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
737 {
738 return cx.background_executor().spawn(async move {
739 task.await;
740 Ok(oid_diff)
741 });
742 }
743 return Task::ready(Ok(oid_diff));
744 }
745
746 let diff_kind = DiffKind::SinceOid(oid);
747 if let Some(task) = self.loading_diffs.get(&(buffer_id, diff_kind)) {
748 let task = task.clone();
749 return cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) });
750 }
751
752 let task = cx
753 .spawn(async move |this, cx| {
754 let result: Result<Entity<BufferDiff>> = async {
755 let buffer_snapshot = buffer.update(cx, |buffer, _| buffer.snapshot());
756 let language_registry =
757 buffer.update(cx, |buffer, _| buffer.language_registry());
758 let content: Option<Arc<str>> = match oid {
759 None => None,
760 Some(oid) => Some(
761 repo.update(cx, |repo, cx| repo.load_blob_content(oid, cx))
762 .await?
763 .into(),
764 ),
765 };
766 let buffer_diff = cx.new(|cx| BufferDiff::new(&buffer_snapshot, cx));
767
768 buffer_diff
769 .update(cx, |buffer_diff, cx| {
770 buffer_diff.language_changed(
771 buffer_snapshot.language().cloned(),
772 language_registry,
773 cx,
774 );
775 buffer_diff.set_base_text(
776 content.clone(),
777 buffer_snapshot.language().cloned(),
778 buffer_snapshot.text,
779 cx,
780 )
781 })
782 .await?;
783 let unstaged_diff = this
784 .update(cx, |this, cx| this.open_unstaged_diff(buffer.clone(), cx))?
785 .await?;
786 buffer_diff.update(cx, |buffer_diff, _| {
787 buffer_diff.set_secondary_diff(unstaged_diff);
788 });
789
790 this.update(cx, |this, cx| {
791 cx.subscribe(&buffer_diff, Self::on_buffer_diff_event)
792 .detach();
793
794 this.loading_diffs.remove(&(buffer_id, diff_kind));
795
796 let git_store = cx.weak_entity();
797 let diff_state = this
798 .diffs
799 .entry(buffer_id)
800 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
801
802 diff_state.update(cx, |state, _| {
803 if let Some(oid) = oid {
804 if let Some(content) = content {
805 state.oid_texts.insert(oid, content);
806 }
807 }
808 state.oid_diffs.insert(oid, buffer_diff.downgrade());
809 });
810 })?;
811
812 Ok(buffer_diff)
813 }
814 .await;
815 result.map_err(Arc::new)
816 })
817 .shared();
818
819 self.loading_diffs
820 .insert((buffer_id, diff_kind), task.clone());
821 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
822 }
823
824 #[ztracing::instrument(skip_all)]
825 pub fn open_uncommitted_diff(
826 &mut self,
827 buffer: Entity<Buffer>,
828 cx: &mut Context<Self>,
829 ) -> Task<Result<Entity<BufferDiff>>> {
830 let buffer_id = buffer.read(cx).remote_id();
831
832 if let Some(diff_state) = self.diffs.get(&buffer_id)
833 && let Some(uncommitted_diff) = diff_state
834 .read(cx)
835 .uncommitted_diff
836 .as_ref()
837 .and_then(|weak| weak.upgrade())
838 {
839 if let Some(task) =
840 diff_state.update(cx, |diff_state, _| diff_state.wait_for_recalculation())
841 {
842 return cx.background_executor().spawn(async move {
843 task.await;
844 Ok(uncommitted_diff)
845 });
846 }
847 return Task::ready(Ok(uncommitted_diff));
848 }
849
850 let Some((repo, repo_path)) =
851 self.repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
852 else {
853 return Task::ready(Err(anyhow!("failed to find git repository for buffer")));
854 };
855
856 let task = self
857 .loading_diffs
858 .entry((buffer_id, DiffKind::Uncommitted))
859 .or_insert_with(|| {
860 let changes = repo.update(cx, |repo, cx| {
861 repo.load_committed_text(buffer_id, repo_path, cx)
862 });
863
864 // todo(lw): hot foreground spawn
865 cx.spawn(async move |this, cx| {
866 Self::open_diff_internal(this, DiffKind::Uncommitted, changes.await, buffer, cx)
867 .await
868 .map_err(Arc::new)
869 })
870 .shared()
871 })
872 .clone();
873
874 cx.background_spawn(async move { task.await.map_err(|e| anyhow!("{e}")) })
875 }
876
877 #[ztracing::instrument(skip_all)]
878 async fn open_diff_internal(
879 this: WeakEntity<Self>,
880 kind: DiffKind,
881 texts: Result<DiffBasesChange>,
882 buffer_entity: Entity<Buffer>,
883 cx: &mut AsyncApp,
884 ) -> Result<Entity<BufferDiff>> {
885 let diff_bases_change = match texts {
886 Err(e) => {
887 this.update(cx, |this, cx| {
888 let buffer = buffer_entity.read(cx);
889 let buffer_id = buffer.remote_id();
890 this.loading_diffs.remove(&(buffer_id, kind));
891 })?;
892 return Err(e);
893 }
894 Ok(change) => change,
895 };
896
897 this.update(cx, |this, cx| {
898 let buffer = buffer_entity.read(cx);
899 let buffer_id = buffer.remote_id();
900 let language = buffer.language().cloned();
901 let language_registry = buffer.language_registry();
902 let text_snapshot = buffer.text_snapshot();
903 this.loading_diffs.remove(&(buffer_id, kind));
904
905 let git_store = cx.weak_entity();
906 let diff_state = this
907 .diffs
908 .entry(buffer_id)
909 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
910
911 let diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
912
913 cx.subscribe(&diff, Self::on_buffer_diff_event).detach();
914 diff_state.update(cx, |diff_state, cx| {
915 diff_state.language_changed = true;
916 diff_state.language = language;
917 diff_state.language_registry = language_registry;
918
919 match kind {
920 DiffKind::Unstaged => diff_state.unstaged_diff = Some(diff.downgrade()),
921 DiffKind::Uncommitted => {
922 let unstaged_diff = if let Some(diff) = diff_state.unstaged_diff() {
923 diff
924 } else {
925 let unstaged_diff = cx.new(|cx| BufferDiff::new(&text_snapshot, cx));
926 diff_state.unstaged_diff = Some(unstaged_diff.downgrade());
927 unstaged_diff
928 };
929
930 diff.update(cx, |diff, _| diff.set_secondary_diff(unstaged_diff));
931 diff_state.uncommitted_diff = Some(diff.downgrade())
932 }
933 DiffKind::SinceOid(_) => {
934 unreachable!("open_diff_internal is not used for OID diffs")
935 }
936 }
937
938 diff_state.diff_bases_changed(text_snapshot, Some(diff_bases_change), cx);
939 let rx = diff_state.wait_for_recalculation();
940
941 anyhow::Ok(async move {
942 if let Some(rx) = rx {
943 rx.await;
944 }
945 Ok(diff)
946 })
947 })
948 })??
949 .await
950 }
951
952 pub fn get_unstaged_diff(&self, buffer_id: BufferId, cx: &App) -> Option<Entity<BufferDiff>> {
953 let diff_state = self.diffs.get(&buffer_id)?;
954 diff_state.read(cx).unstaged_diff.as_ref()?.upgrade()
955 }
956
957 pub fn get_uncommitted_diff(
958 &self,
959 buffer_id: BufferId,
960 cx: &App,
961 ) -> Option<Entity<BufferDiff>> {
962 let diff_state = self.diffs.get(&buffer_id)?;
963 diff_state.read(cx).uncommitted_diff.as_ref()?.upgrade()
964 }
965
966 pub fn get_diff_since_oid(
967 &self,
968 buffer_id: BufferId,
969 oid: Option<git::Oid>,
970 cx: &App,
971 ) -> Option<Entity<BufferDiff>> {
972 let diff_state = self.diffs.get(&buffer_id)?;
973 diff_state.read(cx).oid_diff(oid)
974 }
975
976 pub fn open_conflict_set(
977 &mut self,
978 buffer: Entity<Buffer>,
979 cx: &mut Context<Self>,
980 ) -> Entity<ConflictSet> {
981 log::debug!("open conflict set");
982 let buffer_id = buffer.read(cx).remote_id();
983
984 if let Some(git_state) = self.diffs.get(&buffer_id)
985 && let Some(conflict_set) = git_state
986 .read(cx)
987 .conflict_set
988 .as_ref()
989 .and_then(|weak| weak.upgrade())
990 {
991 let conflict_set = conflict_set;
992 let buffer_snapshot = buffer.read(cx).text_snapshot();
993
994 git_state.update(cx, |state, cx| {
995 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
996 });
997
998 return conflict_set;
999 }
1000
1001 let is_unmerged = self
1002 .repository_and_path_for_buffer_id(buffer_id, cx)
1003 .is_some_and(|(repo, path)| repo.read(cx).snapshot.has_conflict(&path));
1004 let git_store = cx.weak_entity();
1005 let buffer_git_state = self
1006 .diffs
1007 .entry(buffer_id)
1008 .or_insert_with(|| cx.new(|_| BufferGitState::new(git_store)));
1009 let conflict_set = cx.new(|cx| ConflictSet::new(buffer_id, is_unmerged, cx));
1010
1011 self._subscriptions
1012 .push(cx.subscribe(&conflict_set, |_, _, _, cx| {
1013 cx.emit(GitStoreEvent::ConflictsUpdated);
1014 }));
1015
1016 buffer_git_state.update(cx, |state, cx| {
1017 state.conflict_set = Some(conflict_set.downgrade());
1018 let buffer_snapshot = buffer.read(cx).text_snapshot();
1019 let _ = state.reparse_conflict_markers(buffer_snapshot, cx);
1020 });
1021
1022 conflict_set
1023 }
1024
1025 pub fn project_path_git_status(
1026 &self,
1027 project_path: &ProjectPath,
1028 cx: &App,
1029 ) -> Option<FileStatus> {
1030 let (repo, repo_path) = self.repository_and_path_for_project_path(project_path, cx)?;
1031 Some(repo.read(cx).status_for_path(&repo_path)?.status)
1032 }
1033
1034 pub fn checkpoint(&self, cx: &mut App) -> Task<Result<GitStoreCheckpoint>> {
1035 let mut work_directory_abs_paths = Vec::new();
1036 let mut checkpoints = Vec::new();
1037 for repository in self.repositories.values() {
1038 repository.update(cx, |repository, _| {
1039 work_directory_abs_paths.push(repository.snapshot.work_directory_abs_path.clone());
1040 checkpoints.push(repository.checkpoint().map(|checkpoint| checkpoint?));
1041 });
1042 }
1043
1044 cx.background_executor().spawn(async move {
1045 let checkpoints = future::try_join_all(checkpoints).await?;
1046 Ok(GitStoreCheckpoint {
1047 checkpoints_by_work_dir_abs_path: work_directory_abs_paths
1048 .into_iter()
1049 .zip(checkpoints)
1050 .collect(),
1051 })
1052 })
1053 }
1054
1055 pub fn restore_checkpoint(
1056 &self,
1057 checkpoint: GitStoreCheckpoint,
1058 cx: &mut App,
1059 ) -> Task<Result<()>> {
1060 let repositories_by_work_dir_abs_path = self
1061 .repositories
1062 .values()
1063 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
1064 .collect::<HashMap<_, _>>();
1065
1066 let mut tasks = Vec::new();
1067 for (work_dir_abs_path, checkpoint) in checkpoint.checkpoints_by_work_dir_abs_path {
1068 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path) {
1069 let restore = repository.update(cx, |repository, _| {
1070 repository.restore_checkpoint(checkpoint)
1071 });
1072 tasks.push(async move { restore.await? });
1073 }
1074 }
1075 cx.background_spawn(async move {
1076 future::try_join_all(tasks).await?;
1077 Ok(())
1078 })
1079 }
1080
1081 /// Compares two checkpoints, returning true if they are equal.
1082 pub fn compare_checkpoints(
1083 &self,
1084 left: GitStoreCheckpoint,
1085 mut right: GitStoreCheckpoint,
1086 cx: &mut App,
1087 ) -> Task<Result<bool>> {
1088 let repositories_by_work_dir_abs_path = self
1089 .repositories
1090 .values()
1091 .map(|repo| (repo.read(cx).snapshot.work_directory_abs_path.clone(), repo))
1092 .collect::<HashMap<_, _>>();
1093
1094 let mut tasks = Vec::new();
1095 for (work_dir_abs_path, left_checkpoint) in left.checkpoints_by_work_dir_abs_path {
1096 if let Some(right_checkpoint) = right
1097 .checkpoints_by_work_dir_abs_path
1098 .remove(&work_dir_abs_path)
1099 {
1100 if let Some(repository) = repositories_by_work_dir_abs_path.get(&work_dir_abs_path)
1101 {
1102 let compare = repository.update(cx, |repository, _| {
1103 repository.compare_checkpoints(left_checkpoint, right_checkpoint)
1104 });
1105
1106 tasks.push(async move { compare.await? });
1107 }
1108 } else {
1109 return Task::ready(Ok(false));
1110 }
1111 }
1112 cx.background_spawn(async move {
1113 Ok(future::try_join_all(tasks)
1114 .await?
1115 .into_iter()
1116 .all(|result| result))
1117 })
1118 }
1119
1120 /// Blames a buffer.
1121 pub fn blame_buffer(
1122 &self,
1123 buffer: &Entity<Buffer>,
1124 version: Option<clock::Global>,
1125 cx: &mut Context<Self>,
1126 ) -> Task<Result<Option<Blame>>> {
1127 let buffer = buffer.read(cx);
1128 let Some((repo, repo_path)) =
1129 self.repository_and_path_for_buffer_id(buffer.remote_id(), cx)
1130 else {
1131 return Task::ready(Err(anyhow!("failed to find a git repository for buffer")));
1132 };
1133 let content = match &version {
1134 Some(version) => buffer.rope_for_version(version),
1135 None => buffer.as_rope().clone(),
1136 };
1137 let line_ending = buffer.line_ending();
1138 let version = version.unwrap_or(buffer.version());
1139 let buffer_id = buffer.remote_id();
1140
1141 let repo = repo.downgrade();
1142 cx.spawn(async move |_, cx| {
1143 let repository_state = repo
1144 .update(cx, |repo, _| repo.repository_state.clone())?
1145 .await
1146 .map_err(|err| anyhow::anyhow!(err))?;
1147 match repository_state {
1148 RepositoryState::Local(LocalRepositoryState { backend, .. }) => backend
1149 .blame(repo_path.clone(), content, line_ending)
1150 .await
1151 .with_context(|| format!("Failed to blame {:?}", repo_path.as_ref()))
1152 .map(Some),
1153 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
1154 let response = client
1155 .request(proto::BlameBuffer {
1156 project_id: project_id.to_proto(),
1157 buffer_id: buffer_id.into(),
1158 version: serialize_version(&version),
1159 })
1160 .await?;
1161 Ok(deserialize_blame_buffer_response(response))
1162 }
1163 }
1164 })
1165 }
1166
1167 pub fn file_history(
1168 &self,
1169 repo: &Entity<Repository>,
1170 path: RepoPath,
1171 cx: &mut App,
1172 ) -> Task<Result<git::repository::FileHistory>> {
1173 let rx = repo.update(cx, |repo, _| repo.file_history(path));
1174
1175 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1176 }
1177
1178 pub fn file_history_paginated(
1179 &self,
1180 repo: &Entity<Repository>,
1181 path: RepoPath,
1182 skip: usize,
1183 limit: Option<usize>,
1184 cx: &mut App,
1185 ) -> Task<Result<git::repository::FileHistory>> {
1186 let rx = repo.update(cx, |repo, _| repo.file_history_paginated(path, skip, limit));
1187
1188 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1189 }
1190
1191 pub fn get_permalink_to_line(
1192 &self,
1193 buffer: &Entity<Buffer>,
1194 selection: Range<u32>,
1195 cx: &mut App,
1196 ) -> Task<Result<url::Url>> {
1197 let Some(file) = File::from_dyn(buffer.read(cx).file()) else {
1198 return Task::ready(Err(anyhow!("buffer has no file")));
1199 };
1200
1201 let Some((repo, repo_path)) = self.repository_and_path_for_project_path(
1202 &(file.worktree.read(cx).id(), file.path.clone()).into(),
1203 cx,
1204 ) else {
1205 // If we're not in a Git repo, check whether this is a Rust source
1206 // file in the Cargo registry (presumably opened with go-to-definition
1207 // from a normal Rust file). If so, we can put together a permalink
1208 // using crate metadata.
1209 if buffer
1210 .read(cx)
1211 .language()
1212 .is_none_or(|lang| lang.name() != "Rust".into())
1213 {
1214 return Task::ready(Err(anyhow!("no permalink available")));
1215 }
1216 let file_path = file.worktree.read(cx).absolutize(&file.path);
1217 return cx.spawn(async move |cx| {
1218 let provider_registry = cx.update(GitHostingProviderRegistry::default_global);
1219 get_permalink_in_rust_registry_src(provider_registry, file_path, selection)
1220 .context("no permalink available")
1221 });
1222 };
1223
1224 let buffer_id = buffer.read(cx).remote_id();
1225 let branch = repo.read(cx).branch.clone();
1226 let remote = branch
1227 .as_ref()
1228 .and_then(|b| b.upstream.as_ref())
1229 .and_then(|b| b.remote_name())
1230 .unwrap_or("origin")
1231 .to_string();
1232
1233 let rx = repo.update(cx, |repo, _| {
1234 repo.send_job(None, move |state, cx| async move {
1235 match state {
1236 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
1237 let origin_url = backend
1238 .remote_url(&remote)
1239 .await
1240 .with_context(|| format!("remote \"{remote}\" not found"))?;
1241
1242 let sha = backend.head_sha().await.context("reading HEAD SHA")?;
1243
1244 let provider_registry =
1245 cx.update(GitHostingProviderRegistry::default_global);
1246
1247 let (provider, remote) =
1248 parse_git_remote_url(provider_registry, &origin_url)
1249 .context("parsing Git remote URL")?;
1250
1251 Ok(provider.build_permalink(
1252 remote,
1253 BuildPermalinkParams::new(&sha, &repo_path, Some(selection)),
1254 ))
1255 }
1256 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
1257 let response = client
1258 .request(proto::GetPermalinkToLine {
1259 project_id: project_id.to_proto(),
1260 buffer_id: buffer_id.into(),
1261 selection: Some(proto::Range {
1262 start: selection.start as u64,
1263 end: selection.end as u64,
1264 }),
1265 })
1266 .await?;
1267
1268 url::Url::parse(&response.permalink).context("failed to parse permalink")
1269 }
1270 }
1271 })
1272 });
1273 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
1274 }
1275
1276 fn downstream_client(&self) -> Option<(AnyProtoClient, ProjectId)> {
1277 match &self.state {
1278 GitStoreState::Local {
1279 downstream: downstream_client,
1280 ..
1281 } => downstream_client
1282 .as_ref()
1283 .map(|state| (state.client.clone(), state.project_id)),
1284 GitStoreState::Remote {
1285 downstream: downstream_client,
1286 ..
1287 } => downstream_client.clone(),
1288 }
1289 }
1290
1291 fn upstream_client(&self) -> Option<AnyProtoClient> {
1292 match &self.state {
1293 GitStoreState::Local { .. } => None,
1294 GitStoreState::Remote {
1295 upstream_client, ..
1296 } => Some(upstream_client.clone()),
1297 }
1298 }
1299
1300 fn on_worktree_store_event(
1301 &mut self,
1302 worktree_store: Entity<WorktreeStore>,
1303 event: &WorktreeStoreEvent,
1304 cx: &mut Context<Self>,
1305 ) {
1306 let GitStoreState::Local {
1307 project_environment,
1308 downstream,
1309 next_repository_id,
1310 fs,
1311 } = &self.state
1312 else {
1313 return;
1314 };
1315
1316 match event {
1317 WorktreeStoreEvent::WorktreeUpdatedEntries(worktree_id, updated_entries) => {
1318 if let Some(worktree) = self
1319 .worktree_store
1320 .read(cx)
1321 .worktree_for_id(*worktree_id, cx)
1322 {
1323 let paths_by_git_repo =
1324 self.process_updated_entries(&worktree, updated_entries, cx);
1325 let downstream = downstream
1326 .as_ref()
1327 .map(|downstream| downstream.updates_tx.clone());
1328 cx.spawn(async move |_, cx| {
1329 let paths_by_git_repo = paths_by_git_repo.await;
1330 for (repo, paths) in paths_by_git_repo {
1331 repo.update(cx, |repo, cx| {
1332 repo.paths_changed(paths, downstream.clone(), cx);
1333 });
1334 }
1335 })
1336 .detach();
1337 }
1338 }
1339 WorktreeStoreEvent::WorktreeUpdatedGitRepositories(worktree_id, changed_repos) => {
1340 let Some(worktree) = worktree_store.read(cx).worktree_for_id(*worktree_id, cx)
1341 else {
1342 return;
1343 };
1344 if !worktree.read(cx).is_visible() {
1345 log::debug!(
1346 "not adding repositories for local worktree {:?} because it's not visible",
1347 worktree.read(cx).abs_path()
1348 );
1349 return;
1350 }
1351 self.update_repositories_from_worktree(
1352 *worktree_id,
1353 project_environment.clone(),
1354 next_repository_id.clone(),
1355 downstream
1356 .as_ref()
1357 .map(|downstream| downstream.updates_tx.clone()),
1358 changed_repos.clone(),
1359 fs.clone(),
1360 cx,
1361 );
1362 self.local_worktree_git_repos_changed(worktree, changed_repos, cx);
1363 }
1364 WorktreeStoreEvent::WorktreeRemoved(_entity_id, worktree_id) => {
1365 let repos_without_worktree: Vec<RepositoryId> = self
1366 .worktree_ids
1367 .iter_mut()
1368 .filter_map(|(repo_id, worktree_ids)| {
1369 worktree_ids.remove(worktree_id);
1370 if worktree_ids.is_empty() {
1371 Some(*repo_id)
1372 } else {
1373 None
1374 }
1375 })
1376 .collect();
1377 let is_active_repo_removed = repos_without_worktree
1378 .iter()
1379 .any(|repo_id| self.active_repo_id == Some(*repo_id));
1380
1381 for repo_id in repos_without_worktree {
1382 self.repositories.remove(&repo_id);
1383 self.worktree_ids.remove(&repo_id);
1384 if let Some(updates_tx) =
1385 downstream.as_ref().map(|downstream| &downstream.updates_tx)
1386 {
1387 updates_tx
1388 .unbounded_send(DownstreamUpdate::RemoveRepository(repo_id))
1389 .ok();
1390 }
1391 }
1392
1393 if is_active_repo_removed {
1394 if let Some((&repo_id, _)) = self.repositories.iter().next() {
1395 self.active_repo_id = Some(repo_id);
1396 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(repo_id)));
1397 } else {
1398 self.active_repo_id = None;
1399 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1400 }
1401 }
1402 }
1403 _ => {}
1404 }
1405 }
1406 fn on_repository_event(
1407 &mut self,
1408 repo: Entity<Repository>,
1409 event: &RepositoryEvent,
1410 cx: &mut Context<Self>,
1411 ) {
1412 let id = repo.read(cx).id;
1413 let repo_snapshot = repo.read(cx).snapshot.clone();
1414 for (buffer_id, diff) in self.diffs.iter() {
1415 if let Some((buffer_repo, repo_path)) =
1416 self.repository_and_path_for_buffer_id(*buffer_id, cx)
1417 && buffer_repo == repo
1418 {
1419 diff.update(cx, |diff, cx| {
1420 if let Some(conflict_set) = &diff.conflict_set {
1421 let conflict_status_changed =
1422 conflict_set.update(cx, |conflict_set, cx| {
1423 let has_conflict = repo_snapshot.has_conflict(&repo_path);
1424 conflict_set.set_has_conflict(has_conflict, cx)
1425 })?;
1426 if conflict_status_changed {
1427 let buffer_store = self.buffer_store.read(cx);
1428 if let Some(buffer) = buffer_store.get(*buffer_id) {
1429 let _ = diff
1430 .reparse_conflict_markers(buffer.read(cx).text_snapshot(), cx);
1431 }
1432 }
1433 }
1434 anyhow::Ok(())
1435 })
1436 .ok();
1437 }
1438 }
1439 cx.emit(GitStoreEvent::RepositoryUpdated(
1440 id,
1441 event.clone(),
1442 self.active_repo_id == Some(id),
1443 ))
1444 }
1445
1446 fn on_jobs_updated(&mut self, _: Entity<Repository>, _: &JobsUpdated, cx: &mut Context<Self>) {
1447 cx.emit(GitStoreEvent::JobsUpdated)
1448 }
1449
1450 /// Update our list of repositories and schedule git scans in response to a notification from a worktree,
1451 fn update_repositories_from_worktree(
1452 &mut self,
1453 worktree_id: WorktreeId,
1454 project_environment: Entity<ProjectEnvironment>,
1455 next_repository_id: Arc<AtomicU64>,
1456 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
1457 updated_git_repositories: UpdatedGitRepositoriesSet,
1458 fs: Arc<dyn Fs>,
1459 cx: &mut Context<Self>,
1460 ) {
1461 let mut removed_ids = Vec::new();
1462 for update in updated_git_repositories.iter() {
1463 if let Some((id, existing)) = self.repositories.iter().find(|(_, repo)| {
1464 let existing_work_directory_abs_path =
1465 repo.read(cx).work_directory_abs_path.clone();
1466 Some(&existing_work_directory_abs_path)
1467 == update.old_work_directory_abs_path.as_ref()
1468 || Some(&existing_work_directory_abs_path)
1469 == update.new_work_directory_abs_path.as_ref()
1470 }) {
1471 let repo_id = *id;
1472 if let Some(new_work_directory_abs_path) =
1473 update.new_work_directory_abs_path.clone()
1474 {
1475 self.worktree_ids
1476 .entry(repo_id)
1477 .or_insert_with(HashSet::new)
1478 .insert(worktree_id);
1479 existing.update(cx, |existing, cx| {
1480 existing.snapshot.work_directory_abs_path = new_work_directory_abs_path;
1481 existing.schedule_scan(updates_tx.clone(), cx);
1482 });
1483 } else {
1484 if let Some(worktree_ids) = self.worktree_ids.get_mut(&repo_id) {
1485 worktree_ids.remove(&worktree_id);
1486 if worktree_ids.is_empty() {
1487 removed_ids.push(repo_id);
1488 }
1489 }
1490 }
1491 } else if let UpdatedGitRepository {
1492 new_work_directory_abs_path: Some(work_directory_abs_path),
1493 dot_git_abs_path: Some(dot_git_abs_path),
1494 repository_dir_abs_path: Some(_repository_dir_abs_path),
1495 common_dir_abs_path: Some(_common_dir_abs_path),
1496 ..
1497 } = update
1498 {
1499 let id = RepositoryId(next_repository_id.fetch_add(1, atomic::Ordering::Release));
1500 let git_store = cx.weak_entity();
1501 let repo = cx.new(|cx| {
1502 let mut repo = Repository::local(
1503 id,
1504 work_directory_abs_path.clone(),
1505 dot_git_abs_path.clone(),
1506 project_environment.downgrade(),
1507 fs.clone(),
1508 git_store,
1509 cx,
1510 );
1511 if let Some(updates_tx) = updates_tx.as_ref() {
1512 // trigger an empty `UpdateRepository` to ensure remote active_repo_id is set correctly
1513 updates_tx
1514 .unbounded_send(DownstreamUpdate::UpdateRepository(repo.snapshot()))
1515 .ok();
1516 }
1517 repo.schedule_scan(updates_tx.clone(), cx);
1518 repo
1519 });
1520 self._subscriptions
1521 .push(cx.subscribe(&repo, Self::on_repository_event));
1522 self._subscriptions
1523 .push(cx.subscribe(&repo, Self::on_jobs_updated));
1524 self.repositories.insert(id, repo);
1525 self.worktree_ids.insert(id, HashSet::from([worktree_id]));
1526 cx.emit(GitStoreEvent::RepositoryAdded);
1527 self.active_repo_id.get_or_insert_with(|| {
1528 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1529 id
1530 });
1531 }
1532 }
1533
1534 for id in removed_ids {
1535 if self.active_repo_id == Some(id) {
1536 self.active_repo_id = None;
1537 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1538 }
1539 self.repositories.remove(&id);
1540 if let Some(updates_tx) = updates_tx.as_ref() {
1541 updates_tx
1542 .unbounded_send(DownstreamUpdate::RemoveRepository(id))
1543 .ok();
1544 }
1545 }
1546 }
1547
1548 fn on_buffer_store_event(
1549 &mut self,
1550 _: Entity<BufferStore>,
1551 event: &BufferStoreEvent,
1552 cx: &mut Context<Self>,
1553 ) {
1554 match event {
1555 BufferStoreEvent::BufferAdded(buffer) => {
1556 cx.subscribe(buffer, |this, buffer, event, cx| {
1557 if let BufferEvent::LanguageChanged(_) = event {
1558 let buffer_id = buffer.read(cx).remote_id();
1559 if let Some(diff_state) = this.diffs.get(&buffer_id) {
1560 diff_state.update(cx, |diff_state, cx| {
1561 diff_state.buffer_language_changed(buffer, cx);
1562 });
1563 }
1564 }
1565 })
1566 .detach();
1567 }
1568 BufferStoreEvent::SharedBufferClosed(peer_id, buffer_id) => {
1569 if let Some(diffs) = self.shared_diffs.get_mut(peer_id) {
1570 diffs.remove(buffer_id);
1571 }
1572 }
1573 BufferStoreEvent::BufferDropped(buffer_id) => {
1574 self.diffs.remove(buffer_id);
1575 for diffs in self.shared_diffs.values_mut() {
1576 diffs.remove(buffer_id);
1577 }
1578 }
1579 BufferStoreEvent::BufferChangedFilePath { buffer, .. } => {
1580 // Whenever a buffer's file path changes, it's possible that the
1581 // new path is actually a path that is being tracked by a git
1582 // repository. In that case, we'll want to update the buffer's
1583 // `BufferDiffState`, in case it already has one.
1584 let buffer_id = buffer.read(cx).remote_id();
1585 let diff_state = self.diffs.get(&buffer_id);
1586 let repo = self.repository_and_path_for_buffer_id(buffer_id, cx);
1587
1588 if let Some(diff_state) = diff_state
1589 && let Some((repo, repo_path)) = repo
1590 {
1591 let buffer = buffer.clone();
1592 let diff_state = diff_state.clone();
1593
1594 cx.spawn(async move |_git_store, cx| {
1595 async {
1596 let diff_bases_change = repo
1597 .update(cx, |repo, cx| {
1598 repo.load_committed_text(buffer_id, repo_path, cx)
1599 })
1600 .await?;
1601
1602 diff_state.update(cx, |diff_state, cx| {
1603 let buffer_snapshot = buffer.read(cx).text_snapshot();
1604 diff_state.diff_bases_changed(
1605 buffer_snapshot,
1606 Some(diff_bases_change),
1607 cx,
1608 );
1609 });
1610 anyhow::Ok(())
1611 }
1612 .await
1613 .log_err();
1614 })
1615 .detach();
1616 }
1617 }
1618 _ => {}
1619 }
1620 }
1621
1622 pub fn recalculate_buffer_diffs(
1623 &mut self,
1624 buffers: Vec<Entity<Buffer>>,
1625 cx: &mut Context<Self>,
1626 ) -> impl Future<Output = ()> + use<> {
1627 let mut futures = Vec::new();
1628 for buffer in buffers {
1629 if let Some(diff_state) = self.diffs.get_mut(&buffer.read(cx).remote_id()) {
1630 let buffer = buffer.read(cx).text_snapshot();
1631 diff_state.update(cx, |diff_state, cx| {
1632 diff_state.recalculate_diffs(buffer.clone(), cx);
1633 futures.extend(diff_state.wait_for_recalculation().map(FutureExt::boxed));
1634 });
1635 futures.push(diff_state.update(cx, |diff_state, cx| {
1636 diff_state
1637 .reparse_conflict_markers(buffer, cx)
1638 .map(|_| {})
1639 .boxed()
1640 }));
1641 }
1642 }
1643 async move {
1644 futures::future::join_all(futures).await;
1645 }
1646 }
1647
1648 fn on_buffer_diff_event(
1649 &mut self,
1650 diff: Entity<buffer_diff::BufferDiff>,
1651 event: &BufferDiffEvent,
1652 cx: &mut Context<Self>,
1653 ) {
1654 if let BufferDiffEvent::HunksStagedOrUnstaged(new_index_text) = event {
1655 let buffer_id = diff.read(cx).buffer_id;
1656 if let Some(diff_state) = self.diffs.get(&buffer_id) {
1657 let hunk_staging_operation_count = diff_state.update(cx, |diff_state, _| {
1658 diff_state.hunk_staging_operation_count += 1;
1659 diff_state.hunk_staging_operation_count
1660 });
1661 if let Some((repo, path)) = self.repository_and_path_for_buffer_id(buffer_id, cx) {
1662 let recv = repo.update(cx, |repo, cx| {
1663 log::debug!("hunks changed for {}", path.as_unix_str());
1664 repo.spawn_set_index_text_job(
1665 path,
1666 new_index_text.as_ref().map(|rope| rope.to_string()),
1667 Some(hunk_staging_operation_count),
1668 cx,
1669 )
1670 });
1671 let diff = diff.downgrade();
1672 cx.spawn(async move |this, cx| {
1673 if let Ok(Err(error)) = cx.background_spawn(recv).await {
1674 diff.update(cx, |diff, cx| {
1675 diff.clear_pending_hunks(cx);
1676 })
1677 .ok();
1678 this.update(cx, |_, cx| cx.emit(GitStoreEvent::IndexWriteError(error)))
1679 .ok();
1680 }
1681 })
1682 .detach();
1683 }
1684 }
1685 }
1686 }
1687
1688 fn local_worktree_git_repos_changed(
1689 &mut self,
1690 worktree: Entity<Worktree>,
1691 changed_repos: &UpdatedGitRepositoriesSet,
1692 cx: &mut Context<Self>,
1693 ) {
1694 log::debug!("local worktree repos changed");
1695 debug_assert!(worktree.read(cx).is_local());
1696
1697 for repository in self.repositories.values() {
1698 repository.update(cx, |repository, cx| {
1699 let repo_abs_path = &repository.work_directory_abs_path;
1700 if changed_repos.iter().any(|update| {
1701 update.old_work_directory_abs_path.as_ref() == Some(repo_abs_path)
1702 || update.new_work_directory_abs_path.as_ref() == Some(repo_abs_path)
1703 }) {
1704 repository.reload_buffer_diff_bases(cx);
1705 }
1706 });
1707 }
1708 }
1709
1710 pub fn repositories(&self) -> &HashMap<RepositoryId, Entity<Repository>> {
1711 &self.repositories
1712 }
1713
1714 pub fn status_for_buffer_id(&self, buffer_id: BufferId, cx: &App) -> Option<FileStatus> {
1715 let (repo, path) = self.repository_and_path_for_buffer_id(buffer_id, cx)?;
1716 let status = repo.read(cx).snapshot.status_for_path(&path)?;
1717 Some(status.status)
1718 }
1719
1720 pub fn repository_and_path_for_buffer_id(
1721 &self,
1722 buffer_id: BufferId,
1723 cx: &App,
1724 ) -> Option<(Entity<Repository>, RepoPath)> {
1725 let buffer = self.buffer_store.read(cx).get(buffer_id)?;
1726 let project_path = buffer.read(cx).project_path(cx)?;
1727 self.repository_and_path_for_project_path(&project_path, cx)
1728 }
1729
1730 pub fn repository_and_path_for_project_path(
1731 &self,
1732 path: &ProjectPath,
1733 cx: &App,
1734 ) -> Option<(Entity<Repository>, RepoPath)> {
1735 let abs_path = self.worktree_store.read(cx).absolutize(path, cx)?;
1736 self.repositories
1737 .values()
1738 .filter_map(|repo| {
1739 let repo_path = repo.read(cx).abs_path_to_repo_path(&abs_path)?;
1740 Some((repo.clone(), repo_path))
1741 })
1742 .max_by_key(|(repo, _)| repo.read(cx).work_directory_abs_path.clone())
1743 }
1744
1745 pub fn git_init(
1746 &self,
1747 path: Arc<Path>,
1748 fallback_branch_name: String,
1749 cx: &App,
1750 ) -> Task<Result<()>> {
1751 match &self.state {
1752 GitStoreState::Local { fs, .. } => {
1753 let fs = fs.clone();
1754 cx.background_executor()
1755 .spawn(async move { fs.git_init(&path, fallback_branch_name).await })
1756 }
1757 GitStoreState::Remote {
1758 upstream_client,
1759 upstream_project_id: project_id,
1760 ..
1761 } => {
1762 let client = upstream_client.clone();
1763 let project_id = *project_id;
1764 cx.background_executor().spawn(async move {
1765 client
1766 .request(proto::GitInit {
1767 project_id: project_id,
1768 abs_path: path.to_string_lossy().into_owned(),
1769 fallback_branch_name,
1770 })
1771 .await?;
1772 Ok(())
1773 })
1774 }
1775 }
1776 }
1777
1778 pub fn git_clone(
1779 &self,
1780 repo: String,
1781 path: impl Into<Arc<std::path::Path>>,
1782 cx: &App,
1783 ) -> Task<Result<()>> {
1784 let path = path.into();
1785 match &self.state {
1786 GitStoreState::Local { fs, .. } => {
1787 let fs = fs.clone();
1788 cx.background_executor()
1789 .spawn(async move { fs.git_clone(&repo, &path).await })
1790 }
1791 GitStoreState::Remote {
1792 upstream_client,
1793 upstream_project_id,
1794 ..
1795 } => {
1796 if upstream_client.is_via_collab() {
1797 return Task::ready(Err(anyhow!(
1798 "Git Clone isn't supported for project guests"
1799 )));
1800 }
1801 let request = upstream_client.request(proto::GitClone {
1802 project_id: *upstream_project_id,
1803 abs_path: path.to_string_lossy().into_owned(),
1804 remote_repo: repo,
1805 });
1806
1807 cx.background_spawn(async move {
1808 let result = request.await?;
1809
1810 match result.success {
1811 true => Ok(()),
1812 false => Err(anyhow!("Git Clone failed")),
1813 }
1814 })
1815 }
1816 }
1817 }
1818
1819 async fn handle_update_repository(
1820 this: Entity<Self>,
1821 envelope: TypedEnvelope<proto::UpdateRepository>,
1822 mut cx: AsyncApp,
1823 ) -> Result<()> {
1824 this.update(&mut cx, |this, cx| {
1825 let path_style = this.worktree_store.read(cx).path_style();
1826 let mut update = envelope.payload;
1827
1828 let id = RepositoryId::from_proto(update.id);
1829 let client = this.upstream_client().context("no upstream client")?;
1830
1831 let mut repo_subscription = None;
1832 let repo = this.repositories.entry(id).or_insert_with(|| {
1833 let git_store = cx.weak_entity();
1834 let repo = cx.new(|cx| {
1835 Repository::remote(
1836 id,
1837 Path::new(&update.abs_path).into(),
1838 path_style,
1839 ProjectId(update.project_id),
1840 client,
1841 git_store,
1842 cx,
1843 )
1844 });
1845 repo_subscription = Some(cx.subscribe(&repo, Self::on_repository_event));
1846 cx.emit(GitStoreEvent::RepositoryAdded);
1847 repo
1848 });
1849 this._subscriptions.extend(repo_subscription);
1850
1851 repo.update(cx, {
1852 let update = update.clone();
1853 |repo, cx| repo.apply_remote_update(update, cx)
1854 })?;
1855
1856 this.active_repo_id.get_or_insert_with(|| {
1857 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
1858 id
1859 });
1860
1861 if let Some((client, project_id)) = this.downstream_client() {
1862 update.project_id = project_id.to_proto();
1863 client.send(update).log_err();
1864 }
1865 Ok(())
1866 })
1867 }
1868
1869 async fn handle_remove_repository(
1870 this: Entity<Self>,
1871 envelope: TypedEnvelope<proto::RemoveRepository>,
1872 mut cx: AsyncApp,
1873 ) -> Result<()> {
1874 this.update(&mut cx, |this, cx| {
1875 let mut update = envelope.payload;
1876 let id = RepositoryId::from_proto(update.id);
1877 this.repositories.remove(&id);
1878 if let Some((client, project_id)) = this.downstream_client() {
1879 update.project_id = project_id.to_proto();
1880 client.send(update).log_err();
1881 }
1882 if this.active_repo_id == Some(id) {
1883 this.active_repo_id = None;
1884 cx.emit(GitStoreEvent::ActiveRepositoryChanged(None));
1885 }
1886 cx.emit(GitStoreEvent::RepositoryRemoved(id));
1887 });
1888 Ok(())
1889 }
1890
1891 async fn handle_git_init(
1892 this: Entity<Self>,
1893 envelope: TypedEnvelope<proto::GitInit>,
1894 cx: AsyncApp,
1895 ) -> Result<proto::Ack> {
1896 let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1897 let name = envelope.payload.fallback_branch_name;
1898 cx.update(|cx| this.read(cx).git_init(path, name, cx))
1899 .await?;
1900
1901 Ok(proto::Ack {})
1902 }
1903
1904 async fn handle_git_clone(
1905 this: Entity<Self>,
1906 envelope: TypedEnvelope<proto::GitClone>,
1907 cx: AsyncApp,
1908 ) -> Result<proto::GitCloneResponse> {
1909 let path: Arc<Path> = PathBuf::from(envelope.payload.abs_path).into();
1910 let repo_name = envelope.payload.remote_repo;
1911 let result = cx
1912 .update(|cx| this.read(cx).git_clone(repo_name, path, cx))
1913 .await;
1914
1915 Ok(proto::GitCloneResponse {
1916 success: result.is_ok(),
1917 })
1918 }
1919
1920 async fn handle_fetch(
1921 this: Entity<Self>,
1922 envelope: TypedEnvelope<proto::Fetch>,
1923 mut cx: AsyncApp,
1924 ) -> Result<proto::RemoteMessageResponse> {
1925 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1926 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1927 let fetch_options = FetchOptions::from_proto(envelope.payload.remote);
1928 let askpass_id = envelope.payload.askpass_id;
1929
1930 let askpass = make_remote_delegate(
1931 this,
1932 envelope.payload.project_id,
1933 repository_id,
1934 askpass_id,
1935 &mut cx,
1936 );
1937
1938 let remote_output = repository_handle
1939 .update(&mut cx, |repository_handle, cx| {
1940 repository_handle.fetch(fetch_options, askpass, cx)
1941 })
1942 .await??;
1943
1944 Ok(proto::RemoteMessageResponse {
1945 stdout: remote_output.stdout,
1946 stderr: remote_output.stderr,
1947 })
1948 }
1949
1950 async fn handle_push(
1951 this: Entity<Self>,
1952 envelope: TypedEnvelope<proto::Push>,
1953 mut cx: AsyncApp,
1954 ) -> Result<proto::RemoteMessageResponse> {
1955 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
1956 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
1957
1958 let askpass_id = envelope.payload.askpass_id;
1959 let askpass = make_remote_delegate(
1960 this,
1961 envelope.payload.project_id,
1962 repository_id,
1963 askpass_id,
1964 &mut cx,
1965 );
1966
1967 let options = envelope
1968 .payload
1969 .options
1970 .as_ref()
1971 .map(|_| match envelope.payload.options() {
1972 proto::push::PushOptions::SetUpstream => git::repository::PushOptions::SetUpstream,
1973 proto::push::PushOptions::Force => git::repository::PushOptions::Force,
1974 });
1975
1976 let branch_name = envelope.payload.branch_name.into();
1977 let remote_branch_name = envelope.payload.remote_branch_name.into();
1978 let remote_name = envelope.payload.remote_name.into();
1979
1980 let remote_output = repository_handle
1981 .update(&mut cx, |repository_handle, cx| {
1982 repository_handle.push(
1983 branch_name,
1984 remote_branch_name,
1985 remote_name,
1986 options,
1987 askpass,
1988 cx,
1989 )
1990 })
1991 .await??;
1992 Ok(proto::RemoteMessageResponse {
1993 stdout: remote_output.stdout,
1994 stderr: remote_output.stderr,
1995 })
1996 }
1997
1998 async fn handle_pull(
1999 this: Entity<Self>,
2000 envelope: TypedEnvelope<proto::Pull>,
2001 mut cx: AsyncApp,
2002 ) -> Result<proto::RemoteMessageResponse> {
2003 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2004 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2005 let askpass_id = envelope.payload.askpass_id;
2006 let askpass = make_remote_delegate(
2007 this,
2008 envelope.payload.project_id,
2009 repository_id,
2010 askpass_id,
2011 &mut cx,
2012 );
2013
2014 let branch_name = envelope.payload.branch_name.map(|name| name.into());
2015 let remote_name = envelope.payload.remote_name.into();
2016 let rebase = envelope.payload.rebase;
2017
2018 let remote_message = repository_handle
2019 .update(&mut cx, |repository_handle, cx| {
2020 repository_handle.pull(branch_name, remote_name, rebase, askpass, cx)
2021 })
2022 .await??;
2023
2024 Ok(proto::RemoteMessageResponse {
2025 stdout: remote_message.stdout,
2026 stderr: remote_message.stderr,
2027 })
2028 }
2029
2030 async fn handle_stage(
2031 this: Entity<Self>,
2032 envelope: TypedEnvelope<proto::Stage>,
2033 mut cx: AsyncApp,
2034 ) -> Result<proto::Ack> {
2035 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2036 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2037
2038 let entries = envelope
2039 .payload
2040 .paths
2041 .into_iter()
2042 .map(|path| RepoPath::new(&path))
2043 .collect::<Result<Vec<_>>>()?;
2044
2045 repository_handle
2046 .update(&mut cx, |repository_handle, cx| {
2047 repository_handle.stage_entries(entries, cx)
2048 })
2049 .await?;
2050 Ok(proto::Ack {})
2051 }
2052
2053 async fn handle_unstage(
2054 this: Entity<Self>,
2055 envelope: TypedEnvelope<proto::Unstage>,
2056 mut cx: AsyncApp,
2057 ) -> Result<proto::Ack> {
2058 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2059 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2060
2061 let entries = envelope
2062 .payload
2063 .paths
2064 .into_iter()
2065 .map(|path| RepoPath::new(&path))
2066 .collect::<Result<Vec<_>>>()?;
2067
2068 repository_handle
2069 .update(&mut cx, |repository_handle, cx| {
2070 repository_handle.unstage_entries(entries, cx)
2071 })
2072 .await?;
2073
2074 Ok(proto::Ack {})
2075 }
2076
2077 async fn handle_stash(
2078 this: Entity<Self>,
2079 envelope: TypedEnvelope<proto::Stash>,
2080 mut cx: AsyncApp,
2081 ) -> Result<proto::Ack> {
2082 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2083 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2084
2085 let entries = envelope
2086 .payload
2087 .paths
2088 .into_iter()
2089 .map(|path| RepoPath::new(&path))
2090 .collect::<Result<Vec<_>>>()?;
2091
2092 repository_handle
2093 .update(&mut cx, |repository_handle, cx| {
2094 repository_handle.stash_entries(entries, cx)
2095 })
2096 .await?;
2097
2098 Ok(proto::Ack {})
2099 }
2100
2101 async fn handle_stash_pop(
2102 this: Entity<Self>,
2103 envelope: TypedEnvelope<proto::StashPop>,
2104 mut cx: AsyncApp,
2105 ) -> Result<proto::Ack> {
2106 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2107 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2108 let stash_index = envelope.payload.stash_index.map(|i| i as usize);
2109
2110 repository_handle
2111 .update(&mut cx, |repository_handle, cx| {
2112 repository_handle.stash_pop(stash_index, cx)
2113 })
2114 .await?;
2115
2116 Ok(proto::Ack {})
2117 }
2118
2119 async fn handle_stash_apply(
2120 this: Entity<Self>,
2121 envelope: TypedEnvelope<proto::StashApply>,
2122 mut cx: AsyncApp,
2123 ) -> Result<proto::Ack> {
2124 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2125 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2126 let stash_index = envelope.payload.stash_index.map(|i| i as usize);
2127
2128 repository_handle
2129 .update(&mut cx, |repository_handle, cx| {
2130 repository_handle.stash_apply(stash_index, cx)
2131 })
2132 .await?;
2133
2134 Ok(proto::Ack {})
2135 }
2136
2137 async fn handle_stash_drop(
2138 this: Entity<Self>,
2139 envelope: TypedEnvelope<proto::StashDrop>,
2140 mut cx: AsyncApp,
2141 ) -> Result<proto::Ack> {
2142 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2143 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2144 let stash_index = envelope.payload.stash_index.map(|i| i as usize);
2145
2146 repository_handle
2147 .update(&mut cx, |repository_handle, cx| {
2148 repository_handle.stash_drop(stash_index, cx)
2149 })
2150 .await??;
2151
2152 Ok(proto::Ack {})
2153 }
2154
2155 async fn handle_set_index_text(
2156 this: Entity<Self>,
2157 envelope: TypedEnvelope<proto::SetIndexText>,
2158 mut cx: AsyncApp,
2159 ) -> Result<proto::Ack> {
2160 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2161 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2162 let repo_path = RepoPath::from_proto(&envelope.payload.path)?;
2163
2164 repository_handle
2165 .update(&mut cx, |repository_handle, cx| {
2166 repository_handle.spawn_set_index_text_job(
2167 repo_path,
2168 envelope.payload.text,
2169 None,
2170 cx,
2171 )
2172 })
2173 .await??;
2174 Ok(proto::Ack {})
2175 }
2176
2177 async fn handle_run_hook(
2178 this: Entity<Self>,
2179 envelope: TypedEnvelope<proto::RunGitHook>,
2180 mut cx: AsyncApp,
2181 ) -> Result<proto::Ack> {
2182 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2183 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2184 let hook = RunHook::from_proto(envelope.payload.hook).context("invalid hook")?;
2185 repository_handle
2186 .update(&mut cx, |repository_handle, cx| {
2187 repository_handle.run_hook(hook, cx)
2188 })
2189 .await??;
2190 Ok(proto::Ack {})
2191 }
2192
2193 async fn handle_commit(
2194 this: Entity<Self>,
2195 envelope: TypedEnvelope<proto::Commit>,
2196 mut cx: AsyncApp,
2197 ) -> Result<proto::Ack> {
2198 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2199 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2200 let askpass_id = envelope.payload.askpass_id;
2201
2202 let askpass = make_remote_delegate(
2203 this,
2204 envelope.payload.project_id,
2205 repository_id,
2206 askpass_id,
2207 &mut cx,
2208 );
2209
2210 let message = SharedString::from(envelope.payload.message);
2211 let name = envelope.payload.name.map(SharedString::from);
2212 let email = envelope.payload.email.map(SharedString::from);
2213 let options = envelope.payload.options.unwrap_or_default();
2214
2215 repository_handle
2216 .update(&mut cx, |repository_handle, cx| {
2217 repository_handle.commit(
2218 message,
2219 name.zip(email),
2220 CommitOptions {
2221 amend: options.amend,
2222 signoff: options.signoff,
2223 },
2224 askpass,
2225 cx,
2226 )
2227 })
2228 .await??;
2229 Ok(proto::Ack {})
2230 }
2231
2232 async fn handle_get_remotes(
2233 this: Entity<Self>,
2234 envelope: TypedEnvelope<proto::GetRemotes>,
2235 mut cx: AsyncApp,
2236 ) -> Result<proto::GetRemotesResponse> {
2237 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2238 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2239
2240 let branch_name = envelope.payload.branch_name;
2241 let is_push = envelope.payload.is_push;
2242
2243 let remotes = repository_handle
2244 .update(&mut cx, |repository_handle, _| {
2245 repository_handle.get_remotes(branch_name, is_push)
2246 })
2247 .await??;
2248
2249 Ok(proto::GetRemotesResponse {
2250 remotes: remotes
2251 .into_iter()
2252 .map(|remotes| proto::get_remotes_response::Remote {
2253 name: remotes.name.to_string(),
2254 })
2255 .collect::<Vec<_>>(),
2256 })
2257 }
2258
2259 async fn handle_get_worktrees(
2260 this: Entity<Self>,
2261 envelope: TypedEnvelope<proto::GitGetWorktrees>,
2262 mut cx: AsyncApp,
2263 ) -> Result<proto::GitWorktreesResponse> {
2264 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2265 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2266
2267 let worktrees = repository_handle
2268 .update(&mut cx, |repository_handle, _| {
2269 repository_handle.worktrees()
2270 })
2271 .await??;
2272
2273 Ok(proto::GitWorktreesResponse {
2274 worktrees: worktrees
2275 .into_iter()
2276 .map(|worktree| worktree_to_proto(&worktree))
2277 .collect::<Vec<_>>(),
2278 })
2279 }
2280
2281 async fn handle_create_worktree(
2282 this: Entity<Self>,
2283 envelope: TypedEnvelope<proto::GitCreateWorktree>,
2284 mut cx: AsyncApp,
2285 ) -> Result<proto::Ack> {
2286 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2287 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2288 let directory = PathBuf::from(envelope.payload.directory);
2289 let name = envelope.payload.name;
2290 let commit = envelope.payload.commit;
2291
2292 repository_handle
2293 .update(&mut cx, |repository_handle, _| {
2294 repository_handle.create_worktree(name, directory, commit)
2295 })
2296 .await??;
2297
2298 Ok(proto::Ack {})
2299 }
2300
2301 async fn handle_get_branches(
2302 this: Entity<Self>,
2303 envelope: TypedEnvelope<proto::GitGetBranches>,
2304 mut cx: AsyncApp,
2305 ) -> Result<proto::GitBranchesResponse> {
2306 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2307 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2308
2309 let branches = repository_handle
2310 .update(&mut cx, |repository_handle, _| repository_handle.branches())
2311 .await??;
2312
2313 Ok(proto::GitBranchesResponse {
2314 branches: branches
2315 .into_iter()
2316 .map(|branch| branch_to_proto(&branch))
2317 .collect::<Vec<_>>(),
2318 })
2319 }
2320 async fn handle_get_default_branch(
2321 this: Entity<Self>,
2322 envelope: TypedEnvelope<proto::GetDefaultBranch>,
2323 mut cx: AsyncApp,
2324 ) -> Result<proto::GetDefaultBranchResponse> {
2325 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2326 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2327
2328 let branch = repository_handle
2329 .update(&mut cx, |repository_handle, _| {
2330 repository_handle.default_branch(false)
2331 })
2332 .await??
2333 .map(Into::into);
2334
2335 Ok(proto::GetDefaultBranchResponse { branch })
2336 }
2337 async fn handle_create_branch(
2338 this: Entity<Self>,
2339 envelope: TypedEnvelope<proto::GitCreateBranch>,
2340 mut cx: AsyncApp,
2341 ) -> Result<proto::Ack> {
2342 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2343 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2344 let branch_name = envelope.payload.branch_name;
2345
2346 repository_handle
2347 .update(&mut cx, |repository_handle, _| {
2348 repository_handle.create_branch(branch_name, None)
2349 })
2350 .await??;
2351
2352 Ok(proto::Ack {})
2353 }
2354
2355 async fn handle_change_branch(
2356 this: Entity<Self>,
2357 envelope: TypedEnvelope<proto::GitChangeBranch>,
2358 mut cx: AsyncApp,
2359 ) -> Result<proto::Ack> {
2360 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2361 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2362 let branch_name = envelope.payload.branch_name;
2363
2364 repository_handle
2365 .update(&mut cx, |repository_handle, _| {
2366 repository_handle.change_branch(branch_name)
2367 })
2368 .await??;
2369
2370 Ok(proto::Ack {})
2371 }
2372
2373 async fn handle_rename_branch(
2374 this: Entity<Self>,
2375 envelope: TypedEnvelope<proto::GitRenameBranch>,
2376 mut cx: AsyncApp,
2377 ) -> Result<proto::Ack> {
2378 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2379 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2380 let branch = envelope.payload.branch;
2381 let new_name = envelope.payload.new_name;
2382
2383 repository_handle
2384 .update(&mut cx, |repository_handle, _| {
2385 repository_handle.rename_branch(branch, new_name)
2386 })
2387 .await??;
2388
2389 Ok(proto::Ack {})
2390 }
2391
2392 async fn handle_create_remote(
2393 this: Entity<Self>,
2394 envelope: TypedEnvelope<proto::GitCreateRemote>,
2395 mut cx: AsyncApp,
2396 ) -> Result<proto::Ack> {
2397 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2398 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2399 let remote_name = envelope.payload.remote_name;
2400 let remote_url = envelope.payload.remote_url;
2401
2402 repository_handle
2403 .update(&mut cx, |repository_handle, _| {
2404 repository_handle.create_remote(remote_name, remote_url)
2405 })
2406 .await??;
2407
2408 Ok(proto::Ack {})
2409 }
2410
2411 async fn handle_delete_branch(
2412 this: Entity<Self>,
2413 envelope: TypedEnvelope<proto::GitDeleteBranch>,
2414 mut cx: AsyncApp,
2415 ) -> Result<proto::Ack> {
2416 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2417 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2418 let branch_name = envelope.payload.branch_name;
2419
2420 repository_handle
2421 .update(&mut cx, |repository_handle, _| {
2422 repository_handle.delete_branch(branch_name)
2423 })
2424 .await??;
2425
2426 Ok(proto::Ack {})
2427 }
2428
2429 async fn handle_remove_remote(
2430 this: Entity<Self>,
2431 envelope: TypedEnvelope<proto::GitRemoveRemote>,
2432 mut cx: AsyncApp,
2433 ) -> Result<proto::Ack> {
2434 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2435 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2436 let remote_name = envelope.payload.remote_name;
2437
2438 repository_handle
2439 .update(&mut cx, |repository_handle, _| {
2440 repository_handle.remove_remote(remote_name)
2441 })
2442 .await??;
2443
2444 Ok(proto::Ack {})
2445 }
2446
2447 async fn handle_show(
2448 this: Entity<Self>,
2449 envelope: TypedEnvelope<proto::GitShow>,
2450 mut cx: AsyncApp,
2451 ) -> Result<proto::GitCommitDetails> {
2452 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2453 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2454
2455 let commit = repository_handle
2456 .update(&mut cx, |repository_handle, _| {
2457 repository_handle.show(envelope.payload.commit)
2458 })
2459 .await??;
2460 Ok(proto::GitCommitDetails {
2461 sha: commit.sha.into(),
2462 message: commit.message.into(),
2463 commit_timestamp: commit.commit_timestamp,
2464 author_email: commit.author_email.into(),
2465 author_name: commit.author_name.into(),
2466 })
2467 }
2468
2469 async fn handle_load_commit_diff(
2470 this: Entity<Self>,
2471 envelope: TypedEnvelope<proto::LoadCommitDiff>,
2472 mut cx: AsyncApp,
2473 ) -> Result<proto::LoadCommitDiffResponse> {
2474 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2475 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2476
2477 let commit_diff = repository_handle
2478 .update(&mut cx, |repository_handle, _| {
2479 repository_handle.load_commit_diff(envelope.payload.commit)
2480 })
2481 .await??;
2482 Ok(proto::LoadCommitDiffResponse {
2483 files: commit_diff
2484 .files
2485 .into_iter()
2486 .map(|file| proto::CommitFile {
2487 path: file.path.to_proto(),
2488 old_text: file.old_text,
2489 new_text: file.new_text,
2490 is_binary: file.is_binary,
2491 })
2492 .collect(),
2493 })
2494 }
2495
2496 async fn handle_file_history(
2497 this: Entity<Self>,
2498 envelope: TypedEnvelope<proto::GitFileHistory>,
2499 mut cx: AsyncApp,
2500 ) -> Result<proto::GitFileHistoryResponse> {
2501 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2502 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2503 let path = RepoPath::from_proto(&envelope.payload.path)?;
2504 let skip = envelope.payload.skip as usize;
2505 let limit = envelope.payload.limit.map(|l| l as usize);
2506
2507 let file_history = repository_handle
2508 .update(&mut cx, |repository_handle, _| {
2509 repository_handle.file_history_paginated(path, skip, limit)
2510 })
2511 .await??;
2512
2513 Ok(proto::GitFileHistoryResponse {
2514 entries: file_history
2515 .entries
2516 .into_iter()
2517 .map(|entry| proto::FileHistoryEntry {
2518 sha: entry.sha.to_string(),
2519 subject: entry.subject.to_string(),
2520 message: entry.message.to_string(),
2521 commit_timestamp: entry.commit_timestamp,
2522 author_name: entry.author_name.to_string(),
2523 author_email: entry.author_email.to_string(),
2524 })
2525 .collect(),
2526 path: file_history.path.to_proto(),
2527 })
2528 }
2529
2530 async fn handle_reset(
2531 this: Entity<Self>,
2532 envelope: TypedEnvelope<proto::GitReset>,
2533 mut cx: AsyncApp,
2534 ) -> Result<proto::Ack> {
2535 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2536 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2537
2538 let mode = match envelope.payload.mode() {
2539 git_reset::ResetMode::Soft => ResetMode::Soft,
2540 git_reset::ResetMode::Mixed => ResetMode::Mixed,
2541 };
2542
2543 repository_handle
2544 .update(&mut cx, |repository_handle, cx| {
2545 repository_handle.reset(envelope.payload.commit, mode, cx)
2546 })
2547 .await??;
2548 Ok(proto::Ack {})
2549 }
2550
2551 async fn handle_checkout_files(
2552 this: Entity<Self>,
2553 envelope: TypedEnvelope<proto::GitCheckoutFiles>,
2554 mut cx: AsyncApp,
2555 ) -> Result<proto::Ack> {
2556 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2557 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2558 let paths = envelope
2559 .payload
2560 .paths
2561 .iter()
2562 .map(|s| RepoPath::from_proto(s))
2563 .collect::<Result<Vec<_>>>()?;
2564
2565 repository_handle
2566 .update(&mut cx, |repository_handle, cx| {
2567 repository_handle.checkout_files(&envelope.payload.commit, paths, cx)
2568 })
2569 .await?;
2570 Ok(proto::Ack {})
2571 }
2572
2573 async fn handle_open_commit_message_buffer(
2574 this: Entity<Self>,
2575 envelope: TypedEnvelope<proto::OpenCommitMessageBuffer>,
2576 mut cx: AsyncApp,
2577 ) -> Result<proto::OpenBufferResponse> {
2578 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2579 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
2580 let buffer = repository
2581 .update(&mut cx, |repository, cx| {
2582 repository.open_commit_buffer(None, this.read(cx).buffer_store.clone(), cx)
2583 })
2584 .await?;
2585
2586 let buffer_id = buffer.read_with(&cx, |buffer, _| buffer.remote_id());
2587 this.update(&mut cx, |this, cx| {
2588 this.buffer_store.update(cx, |buffer_store, cx| {
2589 buffer_store
2590 .create_buffer_for_peer(
2591 &buffer,
2592 envelope.original_sender_id.unwrap_or(envelope.sender_id),
2593 cx,
2594 )
2595 .detach_and_log_err(cx);
2596 })
2597 });
2598
2599 Ok(proto::OpenBufferResponse {
2600 buffer_id: buffer_id.to_proto(),
2601 })
2602 }
2603
2604 async fn handle_askpass(
2605 this: Entity<Self>,
2606 envelope: TypedEnvelope<proto::AskPassRequest>,
2607 mut cx: AsyncApp,
2608 ) -> Result<proto::AskPassResponse> {
2609 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2610 let repository = Self::repository_for_request(&this, repository_id, &mut cx)?;
2611
2612 let delegates = cx.update(|cx| repository.read(cx).askpass_delegates.clone());
2613 let Some(mut askpass) = delegates.lock().remove(&envelope.payload.askpass_id) else {
2614 debug_panic!("no askpass found");
2615 anyhow::bail!("no askpass found");
2616 };
2617
2618 let response = askpass
2619 .ask_password(envelope.payload.prompt)
2620 .await
2621 .ok_or_else(|| anyhow::anyhow!("askpass cancelled"))?;
2622
2623 delegates
2624 .lock()
2625 .insert(envelope.payload.askpass_id, askpass);
2626
2627 // In fact, we don't quite know what we're doing here, as we're sending askpass password unencrypted, but..
2628 Ok(proto::AskPassResponse {
2629 response: response.decrypt(IKnowWhatIAmDoingAndIHaveReadTheDocs)?,
2630 })
2631 }
2632
2633 async fn handle_check_for_pushed_commits(
2634 this: Entity<Self>,
2635 envelope: TypedEnvelope<proto::CheckForPushedCommits>,
2636 mut cx: AsyncApp,
2637 ) -> Result<proto::CheckForPushedCommitsResponse> {
2638 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2639 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2640
2641 let branches = repository_handle
2642 .update(&mut cx, |repository_handle, _| {
2643 repository_handle.check_for_pushed_commits()
2644 })
2645 .await??;
2646 Ok(proto::CheckForPushedCommitsResponse {
2647 pushed_to: branches
2648 .into_iter()
2649 .map(|commit| commit.to_string())
2650 .collect(),
2651 })
2652 }
2653
2654 async fn handle_git_diff(
2655 this: Entity<Self>,
2656 envelope: TypedEnvelope<proto::GitDiff>,
2657 mut cx: AsyncApp,
2658 ) -> Result<proto::GitDiffResponse> {
2659 let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
2660 let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
2661 let diff_type = match envelope.payload.diff_type() {
2662 proto::git_diff::DiffType::HeadToIndex => DiffType::HeadToIndex,
2663 proto::git_diff::DiffType::HeadToWorktree => DiffType::HeadToWorktree,
2664 };
2665
2666 let mut diff = repository_handle
2667 .update(&mut cx, |repository_handle, cx| {
2668 repository_handle.diff(diff_type, cx)
2669 })
2670 .await??;
2671 const ONE_MB: usize = 1_000_000;
2672 if diff.len() > ONE_MB {
2673 diff = diff.chars().take(ONE_MB).collect()
2674 }
2675
2676 Ok(proto::GitDiffResponse { diff })
2677 }
2678
2679 async fn handle_tree_diff(
2680 this: Entity<Self>,
2681 request: TypedEnvelope<proto::GetTreeDiff>,
2682 mut cx: AsyncApp,
2683 ) -> Result<proto::GetTreeDiffResponse> {
2684 let repository_id = RepositoryId(request.payload.repository_id);
2685 let diff_type = if request.payload.is_merge {
2686 DiffTreeType::MergeBase {
2687 base: request.payload.base.into(),
2688 head: request.payload.head.into(),
2689 }
2690 } else {
2691 DiffTreeType::Since {
2692 base: request.payload.base.into(),
2693 head: request.payload.head.into(),
2694 }
2695 };
2696
2697 let diff = this
2698 .update(&mut cx, |this, cx| {
2699 let repository = this.repositories().get(&repository_id)?;
2700 Some(repository.update(cx, |repo, cx| repo.diff_tree(diff_type, cx)))
2701 })
2702 .context("missing repository")?
2703 .await??;
2704
2705 Ok(proto::GetTreeDiffResponse {
2706 entries: diff
2707 .entries
2708 .into_iter()
2709 .map(|(path, status)| proto::TreeDiffStatus {
2710 path: path.as_ref().to_proto(),
2711 status: match status {
2712 TreeDiffStatus::Added {} => proto::tree_diff_status::Status::Added.into(),
2713 TreeDiffStatus::Modified { .. } => {
2714 proto::tree_diff_status::Status::Modified.into()
2715 }
2716 TreeDiffStatus::Deleted { .. } => {
2717 proto::tree_diff_status::Status::Deleted.into()
2718 }
2719 },
2720 oid: match status {
2721 TreeDiffStatus::Deleted { old } | TreeDiffStatus::Modified { old } => {
2722 Some(old.to_string())
2723 }
2724 TreeDiffStatus::Added => None,
2725 },
2726 })
2727 .collect(),
2728 })
2729 }
2730
2731 async fn handle_get_blob_content(
2732 this: Entity<Self>,
2733 request: TypedEnvelope<proto::GetBlobContent>,
2734 mut cx: AsyncApp,
2735 ) -> Result<proto::GetBlobContentResponse> {
2736 let oid = git::Oid::from_str(&request.payload.oid)?;
2737 let repository_id = RepositoryId(request.payload.repository_id);
2738 let content = this
2739 .update(&mut cx, |this, cx| {
2740 let repository = this.repositories().get(&repository_id)?;
2741 Some(repository.update(cx, |repo, cx| repo.load_blob_content(oid, cx)))
2742 })
2743 .context("missing repository")?
2744 .await?;
2745 Ok(proto::GetBlobContentResponse { content })
2746 }
2747
2748 async fn handle_open_unstaged_diff(
2749 this: Entity<Self>,
2750 request: TypedEnvelope<proto::OpenUnstagedDiff>,
2751 mut cx: AsyncApp,
2752 ) -> Result<proto::OpenUnstagedDiffResponse> {
2753 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2754 let diff = this
2755 .update(&mut cx, |this, cx| {
2756 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2757 Some(this.open_unstaged_diff(buffer, cx))
2758 })
2759 .context("missing buffer")?
2760 .await?;
2761 this.update(&mut cx, |this, _| {
2762 let shared_diffs = this
2763 .shared_diffs
2764 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2765 .or_default();
2766 shared_diffs.entry(buffer_id).or_default().unstaged = Some(diff.clone());
2767 });
2768 let staged_text = diff.read_with(&cx, |diff, cx| diff.base_text_string(cx));
2769 Ok(proto::OpenUnstagedDiffResponse { staged_text })
2770 }
2771
2772 async fn handle_open_uncommitted_diff(
2773 this: Entity<Self>,
2774 request: TypedEnvelope<proto::OpenUncommittedDiff>,
2775 mut cx: AsyncApp,
2776 ) -> Result<proto::OpenUncommittedDiffResponse> {
2777 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2778 let diff = this
2779 .update(&mut cx, |this, cx| {
2780 let buffer = this.buffer_store.read(cx).get(buffer_id)?;
2781 Some(this.open_uncommitted_diff(buffer, cx))
2782 })
2783 .context("missing buffer")?
2784 .await?;
2785 this.update(&mut cx, |this, _| {
2786 let shared_diffs = this
2787 .shared_diffs
2788 .entry(request.original_sender_id.unwrap_or(request.sender_id))
2789 .or_default();
2790 shared_diffs.entry(buffer_id).or_default().uncommitted = Some(diff.clone());
2791 });
2792 Ok(diff.read_with(&cx, |diff, cx| {
2793 use proto::open_uncommitted_diff_response::Mode;
2794
2795 let unstaged_diff = diff.secondary_diff();
2796 let index_snapshot = unstaged_diff.and_then(|diff| {
2797 let diff = diff.read(cx);
2798 diff.base_text_exists().then(|| diff.base_text(cx))
2799 });
2800
2801 let mode;
2802 let staged_text;
2803 let committed_text;
2804 if diff.base_text_exists() {
2805 let committed_snapshot = diff.base_text(cx);
2806 committed_text = Some(committed_snapshot.text());
2807 if let Some(index_text) = index_snapshot {
2808 if index_text.remote_id() == committed_snapshot.remote_id() {
2809 mode = Mode::IndexMatchesHead;
2810 staged_text = None;
2811 } else {
2812 mode = Mode::IndexAndHead;
2813 staged_text = Some(index_text.text());
2814 }
2815 } else {
2816 mode = Mode::IndexAndHead;
2817 staged_text = None;
2818 }
2819 } else {
2820 mode = Mode::IndexAndHead;
2821 committed_text = None;
2822 staged_text = index_snapshot.as_ref().map(|buffer| buffer.text());
2823 }
2824
2825 proto::OpenUncommittedDiffResponse {
2826 committed_text,
2827 staged_text,
2828 mode: mode.into(),
2829 }
2830 }))
2831 }
2832
2833 async fn handle_update_diff_bases(
2834 this: Entity<Self>,
2835 request: TypedEnvelope<proto::UpdateDiffBases>,
2836 mut cx: AsyncApp,
2837 ) -> Result<()> {
2838 let buffer_id = BufferId::new(request.payload.buffer_id)?;
2839 this.update(&mut cx, |this, cx| {
2840 if let Some(diff_state) = this.diffs.get_mut(&buffer_id)
2841 && let Some(buffer) = this.buffer_store.read(cx).get(buffer_id)
2842 {
2843 let buffer = buffer.read(cx).text_snapshot();
2844 diff_state.update(cx, |diff_state, cx| {
2845 diff_state.handle_base_texts_updated(buffer, request.payload, cx);
2846 })
2847 }
2848 });
2849 Ok(())
2850 }
2851
2852 async fn handle_blame_buffer(
2853 this: Entity<Self>,
2854 envelope: TypedEnvelope<proto::BlameBuffer>,
2855 mut cx: AsyncApp,
2856 ) -> Result<proto::BlameBufferResponse> {
2857 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2858 let version = deserialize_version(&envelope.payload.version);
2859 let buffer = this.read_with(&cx, |this, cx| {
2860 this.buffer_store.read(cx).get_existing(buffer_id)
2861 })?;
2862 buffer
2863 .update(&mut cx, |buffer, _| {
2864 buffer.wait_for_version(version.clone())
2865 })
2866 .await?;
2867 let blame = this
2868 .update(&mut cx, |this, cx| {
2869 this.blame_buffer(&buffer, Some(version), cx)
2870 })
2871 .await?;
2872 Ok(serialize_blame_buffer_response(blame))
2873 }
2874
2875 async fn handle_get_permalink_to_line(
2876 this: Entity<Self>,
2877 envelope: TypedEnvelope<proto::GetPermalinkToLine>,
2878 mut cx: AsyncApp,
2879 ) -> Result<proto::GetPermalinkToLineResponse> {
2880 let buffer_id = BufferId::new(envelope.payload.buffer_id)?;
2881 // let version = deserialize_version(&envelope.payload.version);
2882 let selection = {
2883 let proto_selection = envelope
2884 .payload
2885 .selection
2886 .context("no selection to get permalink for defined")?;
2887 proto_selection.start as u32..proto_selection.end as u32
2888 };
2889 let buffer = this.read_with(&cx, |this, cx| {
2890 this.buffer_store.read(cx).get_existing(buffer_id)
2891 })?;
2892 let permalink = this
2893 .update(&mut cx, |this, cx| {
2894 this.get_permalink_to_line(&buffer, selection, cx)
2895 })
2896 .await?;
2897 Ok(proto::GetPermalinkToLineResponse {
2898 permalink: permalink.to_string(),
2899 })
2900 }
2901
2902 fn repository_for_request(
2903 this: &Entity<Self>,
2904 id: RepositoryId,
2905 cx: &mut AsyncApp,
2906 ) -> Result<Entity<Repository>> {
2907 this.read_with(cx, |this, _| {
2908 this.repositories
2909 .get(&id)
2910 .context("missing repository handle")
2911 .cloned()
2912 })
2913 }
2914
2915 pub fn repo_snapshots(&self, cx: &App) -> HashMap<RepositoryId, RepositorySnapshot> {
2916 self.repositories
2917 .iter()
2918 .map(|(id, repo)| (*id, repo.read(cx).snapshot.clone()))
2919 .collect()
2920 }
2921
2922 fn process_updated_entries(
2923 &self,
2924 worktree: &Entity<Worktree>,
2925 updated_entries: &[(Arc<RelPath>, ProjectEntryId, PathChange)],
2926 cx: &mut App,
2927 ) -> Task<HashMap<Entity<Repository>, Vec<RepoPath>>> {
2928 let path_style = worktree.read(cx).path_style();
2929 let mut repo_paths = self
2930 .repositories
2931 .values()
2932 .map(|repo| (repo.read(cx).work_directory_abs_path.clone(), repo.clone()))
2933 .collect::<Vec<_>>();
2934 let mut entries: Vec<_> = updated_entries
2935 .iter()
2936 .map(|(path, _, _)| path.clone())
2937 .collect();
2938 entries.sort();
2939 let worktree = worktree.read(cx);
2940
2941 let entries = entries
2942 .into_iter()
2943 .map(|path| worktree.absolutize(&path))
2944 .collect::<Arc<[_]>>();
2945
2946 let executor = cx.background_executor().clone();
2947 cx.background_executor().spawn(async move {
2948 repo_paths.sort_by(|lhs, rhs| lhs.0.cmp(&rhs.0));
2949 let mut paths_by_git_repo = HashMap::<_, Vec<_>>::default();
2950 let mut tasks = FuturesOrdered::new();
2951 for (repo_path, repo) in repo_paths.into_iter().rev() {
2952 let entries = entries.clone();
2953 let task = executor.spawn(async move {
2954 // Find all repository paths that belong to this repo
2955 let mut ix = entries.partition_point(|path| path < &*repo_path);
2956 if ix == entries.len() {
2957 return None;
2958 };
2959
2960 let mut paths = Vec::new();
2961 // All paths prefixed by a given repo will constitute a continuous range.
2962 while let Some(path) = entries.get(ix)
2963 && let Some(repo_path) = RepositorySnapshot::abs_path_to_repo_path_inner(
2964 &repo_path, path, path_style,
2965 )
2966 {
2967 paths.push((repo_path, ix));
2968 ix += 1;
2969 }
2970 if paths.is_empty() {
2971 None
2972 } else {
2973 Some((repo, paths))
2974 }
2975 });
2976 tasks.push_back(task);
2977 }
2978
2979 // Now, let's filter out the "duplicate" entries that were processed by multiple distinct repos.
2980 let mut path_was_used = vec![false; entries.len()];
2981 let tasks = tasks.collect::<Vec<_>>().await;
2982 // Process tasks from the back: iterating backwards allows us to see more-specific paths first.
2983 // We always want to assign a path to it's innermost repository.
2984 for t in tasks {
2985 let Some((repo, paths)) = t else {
2986 continue;
2987 };
2988 let entry = paths_by_git_repo.entry(repo).or_default();
2989 for (repo_path, ix) in paths {
2990 if path_was_used[ix] {
2991 continue;
2992 }
2993 path_was_used[ix] = true;
2994 entry.push(repo_path);
2995 }
2996 }
2997
2998 paths_by_git_repo
2999 })
3000 }
3001}
3002
3003impl BufferGitState {
3004 fn new(_git_store: WeakEntity<GitStore>) -> Self {
3005 Self {
3006 unstaged_diff: Default::default(),
3007 uncommitted_diff: Default::default(),
3008 oid_diffs: Default::default(),
3009 recalculate_diff_task: Default::default(),
3010 language: Default::default(),
3011 language_registry: Default::default(),
3012 recalculating_tx: postage::watch::channel_with(false).0,
3013 hunk_staging_operation_count: 0,
3014 hunk_staging_operation_count_as_of_write: 0,
3015 head_text: Default::default(),
3016 index_text: Default::default(),
3017 oid_texts: Default::default(),
3018 head_changed: Default::default(),
3019 index_changed: Default::default(),
3020 language_changed: Default::default(),
3021 conflict_updated_futures: Default::default(),
3022 conflict_set: Default::default(),
3023 reparse_conflict_markers_task: Default::default(),
3024 }
3025 }
3026
3027 #[ztracing::instrument(skip_all)]
3028 fn buffer_language_changed(&mut self, buffer: Entity<Buffer>, cx: &mut Context<Self>) {
3029 self.language = buffer.read(cx).language().cloned();
3030 self.language_changed = true;
3031 let _ = self.recalculate_diffs(buffer.read(cx).text_snapshot(), cx);
3032 }
3033
3034 fn reparse_conflict_markers(
3035 &mut self,
3036 buffer: text::BufferSnapshot,
3037 cx: &mut Context<Self>,
3038 ) -> oneshot::Receiver<()> {
3039 let (tx, rx) = oneshot::channel();
3040
3041 let Some(conflict_set) = self
3042 .conflict_set
3043 .as_ref()
3044 .and_then(|conflict_set| conflict_set.upgrade())
3045 else {
3046 return rx;
3047 };
3048
3049 let old_snapshot = conflict_set.read_with(cx, |conflict_set, _| {
3050 if conflict_set.has_conflict {
3051 Some(conflict_set.snapshot())
3052 } else {
3053 None
3054 }
3055 });
3056
3057 if let Some(old_snapshot) = old_snapshot {
3058 self.conflict_updated_futures.push(tx);
3059 self.reparse_conflict_markers_task = Some(cx.spawn(async move |this, cx| {
3060 let (snapshot, changed_range) = cx
3061 .background_spawn(async move {
3062 let new_snapshot = ConflictSet::parse(&buffer);
3063 let changed_range = old_snapshot.compare(&new_snapshot, &buffer);
3064 (new_snapshot, changed_range)
3065 })
3066 .await;
3067 this.update(cx, |this, cx| {
3068 if let Some(conflict_set) = &this.conflict_set {
3069 conflict_set
3070 .update(cx, |conflict_set, cx| {
3071 conflict_set.set_snapshot(snapshot, changed_range, cx);
3072 })
3073 .ok();
3074 }
3075 let futures = std::mem::take(&mut this.conflict_updated_futures);
3076 for tx in futures {
3077 tx.send(()).ok();
3078 }
3079 })
3080 }))
3081 }
3082
3083 rx
3084 }
3085
3086 fn unstaged_diff(&self) -> Option<Entity<BufferDiff>> {
3087 self.unstaged_diff.as_ref().and_then(|set| set.upgrade())
3088 }
3089
3090 fn uncommitted_diff(&self) -> Option<Entity<BufferDiff>> {
3091 self.uncommitted_diff.as_ref().and_then(|set| set.upgrade())
3092 }
3093
3094 fn oid_diff(&self, oid: Option<git::Oid>) -> Option<Entity<BufferDiff>> {
3095 self.oid_diffs.get(&oid).and_then(|weak| weak.upgrade())
3096 }
3097
3098 fn handle_base_texts_updated(
3099 &mut self,
3100 buffer: text::BufferSnapshot,
3101 message: proto::UpdateDiffBases,
3102 cx: &mut Context<Self>,
3103 ) {
3104 use proto::update_diff_bases::Mode;
3105
3106 let Some(mode) = Mode::from_i32(message.mode) else {
3107 return;
3108 };
3109
3110 let diff_bases_change = match mode {
3111 Mode::HeadOnly => DiffBasesChange::SetHead(message.committed_text),
3112 Mode::IndexOnly => DiffBasesChange::SetIndex(message.staged_text),
3113 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(message.committed_text),
3114 Mode::IndexAndHead => DiffBasesChange::SetEach {
3115 index: message.staged_text,
3116 head: message.committed_text,
3117 },
3118 };
3119
3120 self.diff_bases_changed(buffer, Some(diff_bases_change), cx);
3121 }
3122
3123 pub fn wait_for_recalculation(&mut self) -> Option<impl Future<Output = ()> + use<>> {
3124 if *self.recalculating_tx.borrow() {
3125 let mut rx = self.recalculating_tx.subscribe();
3126 Some(async move {
3127 loop {
3128 let is_recalculating = rx.recv().await;
3129 if is_recalculating != Some(true) {
3130 break;
3131 }
3132 }
3133 })
3134 } else {
3135 None
3136 }
3137 }
3138
3139 fn diff_bases_changed(
3140 &mut self,
3141 buffer: text::BufferSnapshot,
3142 diff_bases_change: Option<DiffBasesChange>,
3143 cx: &mut Context<Self>,
3144 ) {
3145 match diff_bases_change {
3146 Some(DiffBasesChange::SetIndex(index)) => {
3147 self.index_text = index.map(|mut index| {
3148 text::LineEnding::normalize(&mut index);
3149 Arc::from(index.as_str())
3150 });
3151 self.index_changed = true;
3152 }
3153 Some(DiffBasesChange::SetHead(head)) => {
3154 self.head_text = head.map(|mut head| {
3155 text::LineEnding::normalize(&mut head);
3156 Arc::from(head.as_str())
3157 });
3158 self.head_changed = true;
3159 }
3160 Some(DiffBasesChange::SetBoth(text)) => {
3161 let text = text.map(|mut text| {
3162 text::LineEnding::normalize(&mut text);
3163 Arc::from(text.as_str())
3164 });
3165 self.head_text = text.clone();
3166 self.index_text = text;
3167 self.head_changed = true;
3168 self.index_changed = true;
3169 }
3170 Some(DiffBasesChange::SetEach { index, head }) => {
3171 self.index_text = index.map(|mut index| {
3172 text::LineEnding::normalize(&mut index);
3173 Arc::from(index.as_str())
3174 });
3175 self.index_changed = true;
3176 self.head_text = head.map(|mut head| {
3177 text::LineEnding::normalize(&mut head);
3178 Arc::from(head.as_str())
3179 });
3180 self.head_changed = true;
3181 }
3182 None => {}
3183 }
3184
3185 self.recalculate_diffs(buffer, cx)
3186 }
3187
3188 #[ztracing::instrument(skip_all)]
3189 fn recalculate_diffs(&mut self, buffer: text::BufferSnapshot, cx: &mut Context<Self>) {
3190 *self.recalculating_tx.borrow_mut() = true;
3191
3192 let language = self.language.clone();
3193 let language_registry = self.language_registry.clone();
3194 let unstaged_diff = self.unstaged_diff();
3195 let uncommitted_diff = self.uncommitted_diff();
3196 let head = self.head_text.clone();
3197 let index = self.index_text.clone();
3198 let index_changed = self.index_changed;
3199 let head_changed = self.head_changed;
3200 let language_changed = self.language_changed;
3201 let prev_hunk_staging_operation_count = self.hunk_staging_operation_count_as_of_write;
3202 let index_matches_head = match (self.index_text.as_ref(), self.head_text.as_ref()) {
3203 (Some(index), Some(head)) => Arc::ptr_eq(index, head),
3204 (None, None) => true,
3205 _ => false,
3206 };
3207
3208 let oid_diffs: Vec<(Option<git::Oid>, Entity<BufferDiff>, Option<Arc<str>>)> = self
3209 .oid_diffs
3210 .iter()
3211 .filter_map(|(oid, weak)| {
3212 let base_text = oid.and_then(|oid| self.oid_texts.get(&oid).cloned());
3213 weak.upgrade().map(|diff| (*oid, diff, base_text))
3214 })
3215 .collect();
3216
3217 self.oid_diffs.retain(|oid, weak| {
3218 let alive = weak.upgrade().is_some();
3219 if !alive {
3220 if let Some(oid) = oid {
3221 self.oid_texts.remove(oid);
3222 }
3223 }
3224 alive
3225 });
3226 self.recalculate_diff_task = Some(cx.spawn(async move |this, cx| {
3227 log::debug!(
3228 "start recalculating diffs for buffer {}",
3229 buffer.remote_id()
3230 );
3231
3232 let mut new_unstaged_diff = None;
3233 if let Some(unstaged_diff) = &unstaged_diff {
3234 new_unstaged_diff = Some(
3235 cx.update(|cx| {
3236 unstaged_diff.read(cx).update_diff(
3237 buffer.clone(),
3238 index,
3239 index_changed.then_some(false),
3240 language.clone(),
3241 cx,
3242 )
3243 })
3244 .await,
3245 );
3246 }
3247
3248 // Dropping BufferDiff can be expensive, so yield back to the event loop
3249 // for a bit
3250 yield_now().await;
3251
3252 let mut new_uncommitted_diff = None;
3253 if let Some(uncommitted_diff) = &uncommitted_diff {
3254 new_uncommitted_diff = if index_matches_head {
3255 new_unstaged_diff.clone()
3256 } else {
3257 Some(
3258 cx.update(|cx| {
3259 uncommitted_diff.read(cx).update_diff(
3260 buffer.clone(),
3261 head,
3262 head_changed.then_some(true),
3263 language.clone(),
3264 cx,
3265 )
3266 })
3267 .await,
3268 )
3269 }
3270 }
3271
3272 // Dropping BufferDiff can be expensive, so yield back to the event loop
3273 // for a bit
3274 yield_now().await;
3275
3276 let cancel = this.update(cx, |this, _| {
3277 // This checks whether all pending stage/unstage operations
3278 // have quiesced (i.e. both the corresponding write and the
3279 // read of that write have completed). If not, then we cancel
3280 // this recalculation attempt to avoid invalidating pending
3281 // state too quickly; another recalculation will come along
3282 // later and clear the pending state once the state of the index has settled.
3283 if this.hunk_staging_operation_count > prev_hunk_staging_operation_count {
3284 *this.recalculating_tx.borrow_mut() = false;
3285 true
3286 } else {
3287 false
3288 }
3289 })?;
3290 if cancel {
3291 log::debug!(
3292 concat!(
3293 "aborting recalculating diffs for buffer {}",
3294 "due to subsequent hunk operations",
3295 ),
3296 buffer.remote_id()
3297 );
3298 return Ok(());
3299 }
3300
3301 let unstaged_changed_range = if let Some((unstaged_diff, new_unstaged_diff)) =
3302 unstaged_diff.as_ref().zip(new_unstaged_diff.clone())
3303 {
3304 let task = unstaged_diff.update(cx, |diff, cx| {
3305 // For git index buffer we skip assigning the language as we do not really need to perform any syntax highlighting on
3306 // it. As a result, by skipping it we are potentially shaving off a lot of RSS plus we get a snappier feel for large diff
3307 // view multibuffers.
3308 diff.set_snapshot(new_unstaged_diff, &buffer, cx)
3309 });
3310 Some(task.await)
3311 } else {
3312 None
3313 };
3314
3315 yield_now().await;
3316
3317 if let Some((uncommitted_diff, new_uncommitted_diff)) =
3318 uncommitted_diff.as_ref().zip(new_uncommitted_diff.clone())
3319 {
3320 uncommitted_diff
3321 .update(cx, |diff, cx| {
3322 if language_changed {
3323 diff.language_changed(language.clone(), language_registry, cx);
3324 }
3325 diff.set_snapshot_with_secondary(
3326 new_uncommitted_diff,
3327 &buffer,
3328 unstaged_changed_range.flatten(),
3329 true,
3330 cx,
3331 )
3332 })
3333 .await;
3334 }
3335
3336 yield_now().await;
3337
3338 for (oid, oid_diff, base_text) in oid_diffs {
3339 let new_oid_diff = cx
3340 .update(|cx| {
3341 oid_diff.read(cx).update_diff(
3342 buffer.clone(),
3343 base_text,
3344 None,
3345 language.clone(),
3346 cx,
3347 )
3348 })
3349 .await;
3350
3351 oid_diff
3352 .update(cx, |diff, cx| diff.set_snapshot(new_oid_diff, &buffer, cx))
3353 .await;
3354
3355 log::debug!(
3356 "finished recalculating oid diff for buffer {} oid {:?}",
3357 buffer.remote_id(),
3358 oid
3359 );
3360
3361 yield_now().await;
3362 }
3363
3364 log::debug!(
3365 "finished recalculating diffs for buffer {}",
3366 buffer.remote_id()
3367 );
3368
3369 if let Some(this) = this.upgrade() {
3370 this.update(cx, |this, _| {
3371 this.index_changed = false;
3372 this.head_changed = false;
3373 this.language_changed = false;
3374 *this.recalculating_tx.borrow_mut() = false;
3375 });
3376 }
3377
3378 Ok(())
3379 }));
3380 }
3381}
3382
3383fn make_remote_delegate(
3384 this: Entity<GitStore>,
3385 project_id: u64,
3386 repository_id: RepositoryId,
3387 askpass_id: u64,
3388 cx: &mut AsyncApp,
3389) -> AskPassDelegate {
3390 AskPassDelegate::new(cx, move |prompt, tx, cx| {
3391 this.update(cx, |this, cx| {
3392 let Some((client, _)) = this.downstream_client() else {
3393 return;
3394 };
3395 let response = client.request(proto::AskPassRequest {
3396 project_id,
3397 repository_id: repository_id.to_proto(),
3398 askpass_id,
3399 prompt,
3400 });
3401 cx.spawn(async move |_, _| {
3402 let mut response = response.await?.response;
3403 tx.send(EncryptedPassword::try_from(response.as_ref())?)
3404 .ok();
3405 response.zeroize();
3406 anyhow::Ok(())
3407 })
3408 .detach_and_log_err(cx);
3409 });
3410 })
3411}
3412
3413impl RepositoryId {
3414 pub fn to_proto(self) -> u64 {
3415 self.0
3416 }
3417
3418 pub fn from_proto(id: u64) -> Self {
3419 RepositoryId(id)
3420 }
3421}
3422
3423impl RepositorySnapshot {
3424 fn empty(id: RepositoryId, work_directory_abs_path: Arc<Path>, path_style: PathStyle) -> Self {
3425 Self {
3426 id,
3427 statuses_by_path: Default::default(),
3428 work_directory_abs_path,
3429 branch: None,
3430 head_commit: None,
3431 scan_id: 0,
3432 merge: Default::default(),
3433 remote_origin_url: None,
3434 remote_upstream_url: None,
3435 stash_entries: Default::default(),
3436 path_style,
3437 }
3438 }
3439
3440 fn initial_update(&self, project_id: u64) -> proto::UpdateRepository {
3441 proto::UpdateRepository {
3442 branch_summary: self.branch.as_ref().map(branch_to_proto),
3443 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
3444 updated_statuses: self
3445 .statuses_by_path
3446 .iter()
3447 .map(|entry| entry.to_proto())
3448 .collect(),
3449 removed_statuses: Default::default(),
3450 current_merge_conflicts: self
3451 .merge
3452 .conflicted_paths
3453 .iter()
3454 .map(|repo_path| repo_path.to_proto())
3455 .collect(),
3456 merge_message: self.merge.message.as_ref().map(|msg| msg.to_string()),
3457 project_id,
3458 id: self.id.to_proto(),
3459 abs_path: self.work_directory_abs_path.to_string_lossy().into_owned(),
3460 entry_ids: vec![self.id.to_proto()],
3461 scan_id: self.scan_id,
3462 is_last_update: true,
3463 stash_entries: self
3464 .stash_entries
3465 .entries
3466 .iter()
3467 .map(stash_to_proto)
3468 .collect(),
3469 remote_upstream_url: self.remote_upstream_url.clone(),
3470 remote_origin_url: self.remote_origin_url.clone(),
3471 }
3472 }
3473
3474 fn build_update(&self, old: &Self, project_id: u64) -> proto::UpdateRepository {
3475 let mut updated_statuses: Vec<proto::StatusEntry> = Vec::new();
3476 let mut removed_statuses: Vec<String> = Vec::new();
3477
3478 let mut new_statuses = self.statuses_by_path.iter().peekable();
3479 let mut old_statuses = old.statuses_by_path.iter().peekable();
3480
3481 let mut current_new_entry = new_statuses.next();
3482 let mut current_old_entry = old_statuses.next();
3483 loop {
3484 match (current_new_entry, current_old_entry) {
3485 (Some(new_entry), Some(old_entry)) => {
3486 match new_entry.repo_path.cmp(&old_entry.repo_path) {
3487 Ordering::Less => {
3488 updated_statuses.push(new_entry.to_proto());
3489 current_new_entry = new_statuses.next();
3490 }
3491 Ordering::Equal => {
3492 if new_entry.status != old_entry.status {
3493 updated_statuses.push(new_entry.to_proto());
3494 }
3495 current_old_entry = old_statuses.next();
3496 current_new_entry = new_statuses.next();
3497 }
3498 Ordering::Greater => {
3499 removed_statuses.push(old_entry.repo_path.to_proto());
3500 current_old_entry = old_statuses.next();
3501 }
3502 }
3503 }
3504 (None, Some(old_entry)) => {
3505 removed_statuses.push(old_entry.repo_path.to_proto());
3506 current_old_entry = old_statuses.next();
3507 }
3508 (Some(new_entry), None) => {
3509 updated_statuses.push(new_entry.to_proto());
3510 current_new_entry = new_statuses.next();
3511 }
3512 (None, None) => break,
3513 }
3514 }
3515
3516 proto::UpdateRepository {
3517 branch_summary: self.branch.as_ref().map(branch_to_proto),
3518 head_commit_details: self.head_commit.as_ref().map(commit_details_to_proto),
3519 updated_statuses,
3520 removed_statuses,
3521 current_merge_conflicts: self
3522 .merge
3523 .conflicted_paths
3524 .iter()
3525 .map(|path| path.to_proto())
3526 .collect(),
3527 merge_message: self.merge.message.as_ref().map(|msg| msg.to_string()),
3528 project_id,
3529 id: self.id.to_proto(),
3530 abs_path: self.work_directory_abs_path.to_string_lossy().into_owned(),
3531 entry_ids: vec![],
3532 scan_id: self.scan_id,
3533 is_last_update: true,
3534 stash_entries: self
3535 .stash_entries
3536 .entries
3537 .iter()
3538 .map(stash_to_proto)
3539 .collect(),
3540 remote_upstream_url: self.remote_upstream_url.clone(),
3541 remote_origin_url: self.remote_origin_url.clone(),
3542 }
3543 }
3544
3545 pub fn status(&self) -> impl Iterator<Item = StatusEntry> + '_ {
3546 self.statuses_by_path.iter().cloned()
3547 }
3548
3549 pub fn status_summary(&self) -> GitSummary {
3550 self.statuses_by_path.summary().item_summary
3551 }
3552
3553 pub fn status_for_path(&self, path: &RepoPath) -> Option<StatusEntry> {
3554 self.statuses_by_path
3555 .get(&PathKey(path.as_ref().clone()), ())
3556 .cloned()
3557 }
3558
3559 pub fn abs_path_to_repo_path(&self, abs_path: &Path) -> Option<RepoPath> {
3560 Self::abs_path_to_repo_path_inner(&self.work_directory_abs_path, abs_path, self.path_style)
3561 }
3562
3563 fn repo_path_to_abs_path(&self, repo_path: &RepoPath) -> PathBuf {
3564 self.path_style
3565 .join(&self.work_directory_abs_path, repo_path.as_std_path())
3566 .unwrap()
3567 .into()
3568 }
3569
3570 #[inline]
3571 fn abs_path_to_repo_path_inner(
3572 work_directory_abs_path: &Path,
3573 abs_path: &Path,
3574 path_style: PathStyle,
3575 ) -> Option<RepoPath> {
3576 let rel_path = path_style.strip_prefix(abs_path, work_directory_abs_path)?;
3577 Some(RepoPath::from_rel_path(&rel_path))
3578 }
3579
3580 pub fn had_conflict_on_last_merge_head_change(&self, repo_path: &RepoPath) -> bool {
3581 self.merge.conflicted_paths.contains(repo_path)
3582 }
3583
3584 pub fn has_conflict(&self, repo_path: &RepoPath) -> bool {
3585 let had_conflict_on_last_merge_head_change =
3586 self.merge.conflicted_paths.contains(repo_path);
3587 let has_conflict_currently = self
3588 .status_for_path(repo_path)
3589 .is_some_and(|entry| entry.status.is_conflicted());
3590 had_conflict_on_last_merge_head_change || has_conflict_currently
3591 }
3592
3593 /// This is the name that will be displayed in the repository selector for this repository.
3594 pub fn display_name(&self) -> SharedString {
3595 self.work_directory_abs_path
3596 .file_name()
3597 .unwrap_or_default()
3598 .to_string_lossy()
3599 .to_string()
3600 .into()
3601 }
3602}
3603
3604pub fn stash_to_proto(entry: &StashEntry) -> proto::StashEntry {
3605 proto::StashEntry {
3606 oid: entry.oid.as_bytes().to_vec(),
3607 message: entry.message.clone(),
3608 branch: entry.branch.clone(),
3609 index: entry.index as u64,
3610 timestamp: entry.timestamp,
3611 }
3612}
3613
3614pub fn proto_to_stash(entry: &proto::StashEntry) -> Result<StashEntry> {
3615 Ok(StashEntry {
3616 oid: Oid::from_bytes(&entry.oid)?,
3617 message: entry.message.clone(),
3618 index: entry.index as usize,
3619 branch: entry.branch.clone(),
3620 timestamp: entry.timestamp,
3621 })
3622}
3623
3624impl MergeDetails {
3625 async fn load(
3626 backend: &Arc<dyn GitRepository>,
3627 status: &SumTree<StatusEntry>,
3628 prev_snapshot: &RepositorySnapshot,
3629 ) -> Result<(MergeDetails, bool)> {
3630 log::debug!("load merge details");
3631 let message = backend.merge_message().await;
3632 let heads = backend
3633 .revparse_batch(vec![
3634 "MERGE_HEAD".into(),
3635 "CHERRY_PICK_HEAD".into(),
3636 "REBASE_HEAD".into(),
3637 "REVERT_HEAD".into(),
3638 "APPLY_HEAD".into(),
3639 ])
3640 .await
3641 .log_err()
3642 .unwrap_or_default()
3643 .into_iter()
3644 .map(|opt| opt.map(SharedString::from))
3645 .collect::<Vec<_>>();
3646 let merge_heads_changed = heads != prev_snapshot.merge.heads;
3647 let conflicted_paths = if merge_heads_changed {
3648 let current_conflicted_paths = TreeSet::from_ordered_entries(
3649 status
3650 .iter()
3651 .filter(|entry| entry.status.is_conflicted())
3652 .map(|entry| entry.repo_path.clone()),
3653 );
3654
3655 // It can happen that we run a scan while a lengthy merge is in progress
3656 // that will eventually result in conflicts, but before those conflicts
3657 // are reported by `git status`. Since for the moment we only care about
3658 // the merge heads state for the purposes of tracking conflicts, don't update
3659 // this state until we see some conflicts.
3660 if heads.iter().any(Option::is_some)
3661 && !prev_snapshot.merge.heads.iter().any(Option::is_some)
3662 && current_conflicted_paths.is_empty()
3663 {
3664 log::debug!("not updating merge heads because no conflicts found");
3665 return Ok((
3666 MergeDetails {
3667 message: message.map(SharedString::from),
3668 ..prev_snapshot.merge.clone()
3669 },
3670 false,
3671 ));
3672 }
3673
3674 current_conflicted_paths
3675 } else {
3676 prev_snapshot.merge.conflicted_paths.clone()
3677 };
3678 let details = MergeDetails {
3679 conflicted_paths,
3680 message: message.map(SharedString::from),
3681 heads,
3682 };
3683 Ok((details, merge_heads_changed))
3684 }
3685}
3686
3687impl Repository {
3688 pub fn snapshot(&self) -> RepositorySnapshot {
3689 self.snapshot.clone()
3690 }
3691
3692 pub fn pending_ops(&self) -> impl Iterator<Item = PendingOps> + '_ {
3693 self.pending_ops.iter().cloned()
3694 }
3695
3696 pub fn pending_ops_summary(&self) -> PathSummary<PendingOpsSummary> {
3697 self.pending_ops.summary().clone()
3698 }
3699
3700 pub fn pending_ops_for_path(&self, path: &RepoPath) -> Option<PendingOps> {
3701 self.pending_ops
3702 .get(&PathKey(path.as_ref().clone()), ())
3703 .cloned()
3704 }
3705
3706 fn local(
3707 id: RepositoryId,
3708 work_directory_abs_path: Arc<Path>,
3709 dot_git_abs_path: Arc<Path>,
3710 project_environment: WeakEntity<ProjectEnvironment>,
3711 fs: Arc<dyn Fs>,
3712 git_store: WeakEntity<GitStore>,
3713 cx: &mut Context<Self>,
3714 ) -> Self {
3715 let snapshot =
3716 RepositorySnapshot::empty(id, work_directory_abs_path.clone(), PathStyle::local());
3717 let state = cx
3718 .spawn(async move |_, cx| {
3719 LocalRepositoryState::new(
3720 work_directory_abs_path,
3721 dot_git_abs_path,
3722 project_environment,
3723 fs,
3724 cx,
3725 )
3726 .await
3727 .map_err(|err| err.to_string())
3728 })
3729 .shared();
3730 let job_sender = Repository::spawn_local_git_worker(state.clone(), cx);
3731 let state = cx
3732 .spawn(async move |_, _| {
3733 let state = state.await?;
3734 Ok(RepositoryState::Local(state))
3735 })
3736 .shared();
3737
3738 cx.subscribe_self(|this, event: &RepositoryEvent, _| match event {
3739 RepositoryEvent::BranchChanged | RepositoryEvent::MergeHeadsChanged => {
3740 this.initial_graph_data.clear();
3741 }
3742 _ => {}
3743 })
3744 .detach();
3745
3746 Repository {
3747 this: cx.weak_entity(),
3748 git_store,
3749 snapshot,
3750 pending_ops: Default::default(),
3751 repository_state: state,
3752 commit_message_buffer: None,
3753 askpass_delegates: Default::default(),
3754 paths_needing_status_update: Default::default(),
3755 latest_askpass_id: 0,
3756 job_sender,
3757 job_id: 0,
3758 active_jobs: Default::default(),
3759 initial_graph_data: Default::default(),
3760 commit_data: Default::default(),
3761 graph_commit_data_handler: GraphCommitHandlerState::Closed,
3762 }
3763 }
3764
3765 fn remote(
3766 id: RepositoryId,
3767 work_directory_abs_path: Arc<Path>,
3768 path_style: PathStyle,
3769 project_id: ProjectId,
3770 client: AnyProtoClient,
3771 git_store: WeakEntity<GitStore>,
3772 cx: &mut Context<Self>,
3773 ) -> Self {
3774 let snapshot = RepositorySnapshot::empty(id, work_directory_abs_path, path_style);
3775 let repository_state = RemoteRepositoryState { project_id, client };
3776 let job_sender = Self::spawn_remote_git_worker(repository_state.clone(), cx);
3777 let repository_state = Task::ready(Ok(RepositoryState::Remote(repository_state))).shared();
3778 Self {
3779 this: cx.weak_entity(),
3780 snapshot,
3781 commit_message_buffer: None,
3782 git_store,
3783 pending_ops: Default::default(),
3784 paths_needing_status_update: Default::default(),
3785 job_sender,
3786 repository_state,
3787 askpass_delegates: Default::default(),
3788 latest_askpass_id: 0,
3789 active_jobs: Default::default(),
3790 job_id: 0,
3791 initial_graph_data: Default::default(),
3792 commit_data: Default::default(),
3793 graph_commit_data_handler: GraphCommitHandlerState::Closed,
3794 }
3795 }
3796
3797 pub fn git_store(&self) -> Option<Entity<GitStore>> {
3798 self.git_store.upgrade()
3799 }
3800
3801 fn reload_buffer_diff_bases(&mut self, cx: &mut Context<Self>) {
3802 let this = cx.weak_entity();
3803 let git_store = self.git_store.clone();
3804 let _ = self.send_keyed_job(
3805 Some(GitJobKey::ReloadBufferDiffBases),
3806 None,
3807 |state, mut cx| async move {
3808 let RepositoryState::Local(LocalRepositoryState { backend, .. }) = state else {
3809 log::error!("tried to recompute diffs for a non-local repository");
3810 return Ok(());
3811 };
3812
3813 let Some(this) = this.upgrade() else {
3814 return Ok(());
3815 };
3816
3817 let repo_diff_state_updates = this.update(&mut cx, |this, cx| {
3818 git_store.update(cx, |git_store, cx| {
3819 git_store
3820 .diffs
3821 .iter()
3822 .filter_map(|(buffer_id, diff_state)| {
3823 let buffer_store = git_store.buffer_store.read(cx);
3824 let buffer = buffer_store.get(*buffer_id)?;
3825 let file = File::from_dyn(buffer.read(cx).file())?;
3826 let abs_path = file.worktree.read(cx).absolutize(&file.path);
3827 let repo_path = this.abs_path_to_repo_path(&abs_path)?;
3828 log::debug!(
3829 "start reload diff bases for repo path {}",
3830 repo_path.as_unix_str()
3831 );
3832 diff_state.update(cx, |diff_state, _| {
3833 let has_unstaged_diff = diff_state
3834 .unstaged_diff
3835 .as_ref()
3836 .is_some_and(|diff| diff.is_upgradable());
3837 let has_uncommitted_diff = diff_state
3838 .uncommitted_diff
3839 .as_ref()
3840 .is_some_and(|set| set.is_upgradable());
3841
3842 Some((
3843 buffer,
3844 repo_path,
3845 has_unstaged_diff.then(|| diff_state.index_text.clone()),
3846 has_uncommitted_diff.then(|| diff_state.head_text.clone()),
3847 ))
3848 })
3849 })
3850 .collect::<Vec<_>>()
3851 })
3852 })?;
3853
3854 let buffer_diff_base_changes = cx
3855 .background_spawn(async move {
3856 let mut changes = Vec::new();
3857 for (buffer, repo_path, current_index_text, current_head_text) in
3858 &repo_diff_state_updates
3859 {
3860 let index_text = if current_index_text.is_some() {
3861 backend.load_index_text(repo_path.clone()).await
3862 } else {
3863 None
3864 };
3865 let head_text = if current_head_text.is_some() {
3866 backend.load_committed_text(repo_path.clone()).await
3867 } else {
3868 None
3869 };
3870
3871 let change =
3872 match (current_index_text.as_ref(), current_head_text.as_ref()) {
3873 (Some(current_index), Some(current_head)) => {
3874 let index_changed =
3875 index_text.as_deref() != current_index.as_deref();
3876 let head_changed =
3877 head_text.as_deref() != current_head.as_deref();
3878 if index_changed && head_changed {
3879 if index_text == head_text {
3880 Some(DiffBasesChange::SetBoth(head_text))
3881 } else {
3882 Some(DiffBasesChange::SetEach {
3883 index: index_text,
3884 head: head_text,
3885 })
3886 }
3887 } else if index_changed {
3888 Some(DiffBasesChange::SetIndex(index_text))
3889 } else if head_changed {
3890 Some(DiffBasesChange::SetHead(head_text))
3891 } else {
3892 None
3893 }
3894 }
3895 (Some(current_index), None) => {
3896 let index_changed =
3897 index_text.as_deref() != current_index.as_deref();
3898 index_changed
3899 .then_some(DiffBasesChange::SetIndex(index_text))
3900 }
3901 (None, Some(current_head)) => {
3902 let head_changed =
3903 head_text.as_deref() != current_head.as_deref();
3904 head_changed.then_some(DiffBasesChange::SetHead(head_text))
3905 }
3906 (None, None) => None,
3907 };
3908
3909 changes.push((buffer.clone(), change))
3910 }
3911 changes
3912 })
3913 .await;
3914
3915 git_store.update(&mut cx, |git_store, cx| {
3916 for (buffer, diff_bases_change) in buffer_diff_base_changes {
3917 let buffer_snapshot = buffer.read(cx).text_snapshot();
3918 let buffer_id = buffer_snapshot.remote_id();
3919 let Some(diff_state) = git_store.diffs.get(&buffer_id) else {
3920 continue;
3921 };
3922
3923 let downstream_client = git_store.downstream_client();
3924 diff_state.update(cx, |diff_state, cx| {
3925 use proto::update_diff_bases::Mode;
3926
3927 if let Some((diff_bases_change, (client, project_id))) =
3928 diff_bases_change.clone().zip(downstream_client)
3929 {
3930 let (staged_text, committed_text, mode) = match diff_bases_change {
3931 DiffBasesChange::SetIndex(index) => {
3932 (index, None, Mode::IndexOnly)
3933 }
3934 DiffBasesChange::SetHead(head) => (None, head, Mode::HeadOnly),
3935 DiffBasesChange::SetEach { index, head } => {
3936 (index, head, Mode::IndexAndHead)
3937 }
3938 DiffBasesChange::SetBoth(text) => {
3939 (None, text, Mode::IndexMatchesHead)
3940 }
3941 };
3942 client
3943 .send(proto::UpdateDiffBases {
3944 project_id: project_id.to_proto(),
3945 buffer_id: buffer_id.to_proto(),
3946 staged_text,
3947 committed_text,
3948 mode: mode as i32,
3949 })
3950 .log_err();
3951 }
3952
3953 diff_state.diff_bases_changed(buffer_snapshot, diff_bases_change, cx);
3954 });
3955 }
3956 })
3957 },
3958 );
3959 }
3960
3961 pub fn send_job<F, Fut, R>(
3962 &mut self,
3963 status: Option<SharedString>,
3964 job: F,
3965 ) -> oneshot::Receiver<R>
3966 where
3967 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3968 Fut: Future<Output = R> + 'static,
3969 R: Send + 'static,
3970 {
3971 self.send_keyed_job(None, status, job)
3972 }
3973
3974 fn send_keyed_job<F, Fut, R>(
3975 &mut self,
3976 key: Option<GitJobKey>,
3977 status: Option<SharedString>,
3978 job: F,
3979 ) -> oneshot::Receiver<R>
3980 where
3981 F: FnOnce(RepositoryState, AsyncApp) -> Fut + 'static,
3982 Fut: Future<Output = R> + 'static,
3983 R: Send + 'static,
3984 {
3985 let (result_tx, result_rx) = futures::channel::oneshot::channel();
3986 let job_id = post_inc(&mut self.job_id);
3987 let this = self.this.clone();
3988 self.job_sender
3989 .unbounded_send(GitJob {
3990 key,
3991 job: Box::new(move |state, cx: &mut AsyncApp| {
3992 let job = job(state, cx.clone());
3993 cx.spawn(async move |cx| {
3994 if let Some(s) = status.clone() {
3995 this.update(cx, |this, cx| {
3996 this.active_jobs.insert(
3997 job_id,
3998 JobInfo {
3999 start: Instant::now(),
4000 message: s.clone(),
4001 },
4002 );
4003
4004 cx.notify();
4005 })
4006 .ok();
4007 }
4008 let result = job.await;
4009
4010 this.update(cx, |this, cx| {
4011 this.active_jobs.remove(&job_id);
4012 cx.notify();
4013 })
4014 .ok();
4015
4016 result_tx.send(result).ok();
4017 })
4018 }),
4019 })
4020 .ok();
4021 result_rx
4022 }
4023
4024 pub fn set_as_active_repository(&self, cx: &mut Context<Self>) {
4025 let Some(git_store) = self.git_store.upgrade() else {
4026 return;
4027 };
4028 let entity = cx.entity();
4029 git_store.update(cx, |git_store, cx| {
4030 let Some((&id, _)) = git_store
4031 .repositories
4032 .iter()
4033 .find(|(_, handle)| *handle == &entity)
4034 else {
4035 return;
4036 };
4037 git_store.active_repo_id = Some(id);
4038 cx.emit(GitStoreEvent::ActiveRepositoryChanged(Some(id)));
4039 });
4040 }
4041
4042 pub fn cached_status(&self) -> impl '_ + Iterator<Item = StatusEntry> {
4043 self.snapshot.status()
4044 }
4045
4046 pub fn cached_stash(&self) -> GitStash {
4047 self.snapshot.stash_entries.clone()
4048 }
4049
4050 pub fn repo_path_to_project_path(&self, path: &RepoPath, cx: &App) -> Option<ProjectPath> {
4051 let git_store = self.git_store.upgrade()?;
4052 let worktree_store = git_store.read(cx).worktree_store.read(cx);
4053 let abs_path = self.snapshot.repo_path_to_abs_path(path);
4054 let abs_path = SanitizedPath::new(&abs_path);
4055 let (worktree, relative_path) = worktree_store.find_worktree(abs_path, cx)?;
4056 Some(ProjectPath {
4057 worktree_id: worktree.read(cx).id(),
4058 path: relative_path,
4059 })
4060 }
4061
4062 pub fn project_path_to_repo_path(&self, path: &ProjectPath, cx: &App) -> Option<RepoPath> {
4063 let git_store = self.git_store.upgrade()?;
4064 let worktree_store = git_store.read(cx).worktree_store.read(cx);
4065 let abs_path = worktree_store.absolutize(path, cx)?;
4066 self.snapshot.abs_path_to_repo_path(&abs_path)
4067 }
4068
4069 pub fn contains_sub_repo(&self, other: &Entity<Self>, cx: &App) -> bool {
4070 other
4071 .read(cx)
4072 .snapshot
4073 .work_directory_abs_path
4074 .starts_with(&self.snapshot.work_directory_abs_path)
4075 }
4076
4077 pub fn open_commit_buffer(
4078 &mut self,
4079 languages: Option<Arc<LanguageRegistry>>,
4080 buffer_store: Entity<BufferStore>,
4081 cx: &mut Context<Self>,
4082 ) -> Task<Result<Entity<Buffer>>> {
4083 let id = self.id;
4084 if let Some(buffer) = self.commit_message_buffer.clone() {
4085 return Task::ready(Ok(buffer));
4086 }
4087 let this = cx.weak_entity();
4088
4089 let rx = self.send_job(None, move |state, mut cx| async move {
4090 let Some(this) = this.upgrade() else {
4091 bail!("git store was dropped");
4092 };
4093 match state {
4094 RepositoryState::Local(..) => {
4095 this.update(&mut cx, |_, cx| {
4096 Self::open_local_commit_buffer(languages, buffer_store, cx)
4097 })
4098 .await
4099 }
4100 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4101 let request = client.request(proto::OpenCommitMessageBuffer {
4102 project_id: project_id.0,
4103 repository_id: id.to_proto(),
4104 });
4105 let response = request.await.context("requesting to open commit buffer")?;
4106 let buffer_id = BufferId::new(response.buffer_id)?;
4107 let buffer = buffer_store
4108 .update(&mut cx, |buffer_store, cx| {
4109 buffer_store.wait_for_remote_buffer(buffer_id, cx)
4110 })
4111 .await?;
4112 if let Some(language_registry) = languages {
4113 let git_commit_language =
4114 language_registry.language_for_name("Git Commit").await?;
4115 buffer.update(&mut cx, |buffer, cx| {
4116 buffer.set_language(Some(git_commit_language), cx);
4117 });
4118 }
4119 this.update(&mut cx, |this, _| {
4120 this.commit_message_buffer = Some(buffer.clone());
4121 });
4122 Ok(buffer)
4123 }
4124 }
4125 });
4126
4127 cx.spawn(|_, _: &mut AsyncApp| async move { rx.await? })
4128 }
4129
4130 fn open_local_commit_buffer(
4131 language_registry: Option<Arc<LanguageRegistry>>,
4132 buffer_store: Entity<BufferStore>,
4133 cx: &mut Context<Self>,
4134 ) -> Task<Result<Entity<Buffer>>> {
4135 cx.spawn(async move |repository, cx| {
4136 let git_commit_language = match language_registry {
4137 Some(language_registry) => {
4138 Some(language_registry.language_for_name("Git Commit").await?)
4139 }
4140 None => None,
4141 };
4142 let buffer = buffer_store
4143 .update(cx, |buffer_store, cx| {
4144 buffer_store.create_buffer(git_commit_language, false, cx)
4145 })
4146 .await?;
4147
4148 repository.update(cx, |repository, _| {
4149 repository.commit_message_buffer = Some(buffer.clone());
4150 })?;
4151 Ok(buffer)
4152 })
4153 }
4154
4155 pub fn checkout_files(
4156 &mut self,
4157 commit: &str,
4158 paths: Vec<RepoPath>,
4159 cx: &mut Context<Self>,
4160 ) -> Task<Result<()>> {
4161 let commit = commit.to_string();
4162 let id = self.id;
4163
4164 self.spawn_job_with_tracking(
4165 paths.clone(),
4166 pending_op::GitStatus::Reverted,
4167 cx,
4168 async move |this, cx| {
4169 this.update(cx, |this, _cx| {
4170 this.send_job(
4171 Some(format!("git checkout {}", commit).into()),
4172 move |git_repo, _| async move {
4173 match git_repo {
4174 RepositoryState::Local(LocalRepositoryState {
4175 backend,
4176 environment,
4177 ..
4178 }) => {
4179 backend
4180 .checkout_files(commit, paths, environment.clone())
4181 .await
4182 }
4183 RepositoryState::Remote(RemoteRepositoryState {
4184 project_id,
4185 client,
4186 }) => {
4187 client
4188 .request(proto::GitCheckoutFiles {
4189 project_id: project_id.0,
4190 repository_id: id.to_proto(),
4191 commit,
4192 paths: paths
4193 .into_iter()
4194 .map(|p| p.to_proto())
4195 .collect(),
4196 })
4197 .await?;
4198
4199 Ok(())
4200 }
4201 }
4202 },
4203 )
4204 })?
4205 .await?
4206 },
4207 )
4208 }
4209
4210 pub fn reset(
4211 &mut self,
4212 commit: String,
4213 reset_mode: ResetMode,
4214 _cx: &mut App,
4215 ) -> oneshot::Receiver<Result<()>> {
4216 let id = self.id;
4217
4218 self.send_job(None, move |git_repo, _| async move {
4219 match git_repo {
4220 RepositoryState::Local(LocalRepositoryState {
4221 backend,
4222 environment,
4223 ..
4224 }) => backend.reset(commit, reset_mode, environment).await,
4225 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4226 client
4227 .request(proto::GitReset {
4228 project_id: project_id.0,
4229 repository_id: id.to_proto(),
4230 commit,
4231 mode: match reset_mode {
4232 ResetMode::Soft => git_reset::ResetMode::Soft.into(),
4233 ResetMode::Mixed => git_reset::ResetMode::Mixed.into(),
4234 },
4235 })
4236 .await?;
4237
4238 Ok(())
4239 }
4240 }
4241 })
4242 }
4243
4244 pub fn show(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDetails>> {
4245 let id = self.id;
4246 self.send_job(None, move |git_repo, _cx| async move {
4247 match git_repo {
4248 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
4249 backend.show(commit).await
4250 }
4251 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4252 let resp = client
4253 .request(proto::GitShow {
4254 project_id: project_id.0,
4255 repository_id: id.to_proto(),
4256 commit,
4257 })
4258 .await?;
4259
4260 Ok(CommitDetails {
4261 sha: resp.sha.into(),
4262 message: resp.message.into(),
4263 commit_timestamp: resp.commit_timestamp,
4264 author_email: resp.author_email.into(),
4265 author_name: resp.author_name.into(),
4266 })
4267 }
4268 }
4269 })
4270 }
4271
4272 pub fn load_commit_diff(&mut self, commit: String) -> oneshot::Receiver<Result<CommitDiff>> {
4273 let id = self.id;
4274 self.send_job(None, move |git_repo, cx| async move {
4275 match git_repo {
4276 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
4277 backend.load_commit(commit, cx).await
4278 }
4279 RepositoryState::Remote(RemoteRepositoryState {
4280 client, project_id, ..
4281 }) => {
4282 let response = client
4283 .request(proto::LoadCommitDiff {
4284 project_id: project_id.0,
4285 repository_id: id.to_proto(),
4286 commit,
4287 })
4288 .await?;
4289 Ok(CommitDiff {
4290 files: response
4291 .files
4292 .into_iter()
4293 .map(|file| {
4294 Ok(CommitFile {
4295 path: RepoPath::from_proto(&file.path)?,
4296 old_text: file.old_text,
4297 new_text: file.new_text,
4298 is_binary: file.is_binary,
4299 })
4300 })
4301 .collect::<Result<Vec<_>>>()?,
4302 })
4303 }
4304 }
4305 })
4306 }
4307
4308 pub fn file_history(
4309 &mut self,
4310 path: RepoPath,
4311 ) -> oneshot::Receiver<Result<git::repository::FileHistory>> {
4312 self.file_history_paginated(path, 0, None)
4313 }
4314
4315 pub fn file_history_paginated(
4316 &mut self,
4317 path: RepoPath,
4318 skip: usize,
4319 limit: Option<usize>,
4320 ) -> oneshot::Receiver<Result<git::repository::FileHistory>> {
4321 let id = self.id;
4322 self.send_job(None, move |git_repo, _cx| async move {
4323 match git_repo {
4324 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
4325 backend.file_history_paginated(path, skip, limit).await
4326 }
4327 RepositoryState::Remote(RemoteRepositoryState { client, project_id }) => {
4328 let response = client
4329 .request(proto::GitFileHistory {
4330 project_id: project_id.0,
4331 repository_id: id.to_proto(),
4332 path: path.to_proto(),
4333 skip: skip as u64,
4334 limit: limit.map(|l| l as u64),
4335 })
4336 .await?;
4337 Ok(git::repository::FileHistory {
4338 entries: response
4339 .entries
4340 .into_iter()
4341 .map(|entry| git::repository::FileHistoryEntry {
4342 sha: entry.sha.into(),
4343 subject: entry.subject.into(),
4344 message: entry.message.into(),
4345 commit_timestamp: entry.commit_timestamp,
4346 author_name: entry.author_name.into(),
4347 author_email: entry.author_email.into(),
4348 })
4349 .collect(),
4350 path: RepoPath::from_proto(&response.path)?,
4351 })
4352 }
4353 }
4354 })
4355 }
4356
4357 pub fn graph_data(
4358 &mut self,
4359 log_source: LogSource,
4360 log_order: LogOrder,
4361 range: Range<usize>,
4362 cx: &mut Context<Self>,
4363 ) -> (&[Arc<InitialGraphCommitData>], bool) {
4364 let (loading_task, initial_commit_data) = self
4365 .initial_graph_data
4366 .entry((log_order, log_source.clone()))
4367 .or_insert_with(|| {
4368 let state = self.repository_state.clone();
4369 let log_source = log_source.clone();
4370 (
4371 cx.spawn(async move |repository, cx| {
4372 let state = state.await;
4373 match state {
4374 Ok(RepositoryState::Local(LocalRepositoryState {
4375 backend, ..
4376 })) => {
4377 Self::local_git_graph_data(
4378 repository, backend, log_source, log_order, cx,
4379 )
4380 .await
4381 }
4382 Ok(RepositoryState::Remote(_)) => {
4383 Err("Git graph is not supported for collab yet".into())
4384 }
4385 Err(e) => Err(SharedString::from(e)),
4386 }
4387 }),
4388 vec![],
4389 )
4390 });
4391
4392 let max_start = initial_commit_data.len().saturating_sub(1);
4393 let max_end = initial_commit_data.len();
4394 (
4395 &initial_commit_data[range.start.min(max_start)..range.end.min(max_end)],
4396 !loading_task.is_ready(),
4397 )
4398 }
4399
4400 async fn local_git_graph_data(
4401 this: WeakEntity<Self>,
4402 backend: Arc<dyn GitRepository>,
4403 log_source: LogSource,
4404 log_order: LogOrder,
4405 cx: &mut AsyncApp,
4406 ) -> Result<(), SharedString> {
4407 let (request_tx, request_rx) =
4408 smol::channel::unbounded::<Vec<Arc<InitialGraphCommitData>>>();
4409
4410 let task = cx.background_executor().spawn({
4411 let log_source = log_source.clone();
4412 async move {
4413 backend
4414 .initial_graph_data(log_source, log_order, request_tx)
4415 .await
4416 .map_err(|err| SharedString::from(err.to_string()))
4417 }
4418 });
4419
4420 let graph_data_key = (log_order, log_source.clone());
4421
4422 while let Ok(initial_graph_commit_data) = request_rx.recv().await {
4423 this.update(cx, |repository, cx| {
4424 let graph_data = repository
4425 .initial_graph_data
4426 .get_mut(&graph_data_key)
4427 .map(|(_, graph_data)| graph_data);
4428 debug_assert!(
4429 graph_data.is_some(),
4430 "This task should be dropped if data doesn't exist"
4431 );
4432
4433 if let Some(graph_data) = graph_data {
4434 graph_data.extend(initial_graph_commit_data);
4435 cx.emit(RepositoryEvent::GitGraphCountUpdated(
4436 graph_data_key.clone(),
4437 graph_data.len(),
4438 ));
4439 }
4440 })
4441 .ok();
4442 }
4443
4444 task.await?;
4445
4446 Ok(())
4447 }
4448
4449 pub fn fetch_commit_data(&mut self, sha: Oid, cx: &mut Context<Self>) -> &CommitDataState {
4450 if !self.commit_data.contains_key(&sha) {
4451 match &self.graph_commit_data_handler {
4452 GraphCommitHandlerState::Open(handler) => {
4453 if handler.commit_data_request.try_send(sha).is_ok() {
4454 let old_value = self.commit_data.insert(sha, CommitDataState::Loading);
4455 debug_assert!(old_value.is_none(), "We should never overwrite commit data");
4456 }
4457 }
4458 GraphCommitHandlerState::Closed => {
4459 self.open_graph_commit_data_handler(cx);
4460 }
4461 GraphCommitHandlerState::Starting => {}
4462 }
4463 }
4464
4465 self.commit_data
4466 .get(&sha)
4467 .unwrap_or(&CommitDataState::Loading)
4468 }
4469
4470 fn open_graph_commit_data_handler(&mut self, cx: &mut Context<Self>) {
4471 self.graph_commit_data_handler = GraphCommitHandlerState::Starting;
4472
4473 let state = self.repository_state.clone();
4474 let (result_tx, result_rx) = smol::channel::bounded::<(Oid, GraphCommitData)>(64);
4475 let (request_tx, request_rx) = smol::channel::unbounded::<Oid>();
4476
4477 let foreground_task = cx.spawn(async move |this, cx| {
4478 while let Ok((sha, commit_data)) = result_rx.recv().await {
4479 let result = this.update(cx, |this, cx| {
4480 let old_value = this
4481 .commit_data
4482 .insert(sha, CommitDataState::Loaded(Arc::new(commit_data)));
4483 debug_assert!(
4484 !matches!(old_value, Some(CommitDataState::Loaded(_))),
4485 "We should never overwrite commit data"
4486 );
4487
4488 cx.notify();
4489 });
4490 if result.is_err() {
4491 break;
4492 }
4493 }
4494
4495 this.update(cx, |this, _cx| {
4496 this.graph_commit_data_handler = GraphCommitHandlerState::Closed;
4497 })
4498 .ok();
4499 });
4500
4501 let request_tx_for_handler = request_tx;
4502 let background_executor = cx.background_executor().clone();
4503
4504 cx.background_spawn(async move {
4505 let backend = match state.await {
4506 Ok(RepositoryState::Local(LocalRepositoryState { backend, .. })) => backend,
4507 Ok(RepositoryState::Remote(_)) => {
4508 log::error!("commit_data_reader not supported for remote repositories");
4509 return;
4510 }
4511 Err(error) => {
4512 log::error!("failed to get repository state: {error}");
4513 return;
4514 }
4515 };
4516
4517 let reader = match backend.commit_data_reader() {
4518 Ok(reader) => reader,
4519 Err(error) => {
4520 log::error!("failed to create commit data reader: {error:?}");
4521 return;
4522 }
4523 };
4524
4525 loop {
4526 let timeout = background_executor.timer(std::time::Duration::from_secs(10));
4527
4528 futures::select_biased! {
4529 sha = futures::FutureExt::fuse(request_rx.recv()) => {
4530 let Ok(sha) = sha else {
4531 break;
4532 };
4533
4534 match reader.read(sha).await {
4535 Ok(commit_data) => {
4536 if result_tx.send((sha, commit_data)).await.is_err() {
4537 break;
4538 }
4539 }
4540 Err(error) => {
4541 log::error!("failed to read commit data for {sha}: {error:?}");
4542 }
4543 }
4544 }
4545 _ = futures::FutureExt::fuse(timeout) => {
4546 break;
4547 }
4548 }
4549 }
4550
4551 drop(result_tx);
4552 })
4553 .detach();
4554
4555 self.graph_commit_data_handler = GraphCommitHandlerState::Open(GraphCommitDataHandler {
4556 _task: foreground_task,
4557 commit_data_request: request_tx_for_handler,
4558 });
4559 }
4560
4561 fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
4562 Some(self.git_store.upgrade()?.read(cx).buffer_store.clone())
4563 }
4564
4565 fn save_buffers<'a>(
4566 &self,
4567 entries: impl IntoIterator<Item = &'a RepoPath>,
4568 cx: &mut Context<Self>,
4569 ) -> Vec<Task<anyhow::Result<()>>> {
4570 let mut save_futures = Vec::new();
4571 if let Some(buffer_store) = self.buffer_store(cx) {
4572 buffer_store.update(cx, |buffer_store, cx| {
4573 for path in entries {
4574 let Some(project_path) = self.repo_path_to_project_path(path, cx) else {
4575 continue;
4576 };
4577 if let Some(buffer) = buffer_store.get_by_path(&project_path)
4578 && buffer
4579 .read(cx)
4580 .file()
4581 .is_some_and(|file| file.disk_state().exists())
4582 && buffer.read(cx).has_unsaved_edits()
4583 {
4584 save_futures.push(buffer_store.save_buffer(buffer, cx));
4585 }
4586 }
4587 })
4588 }
4589 save_futures
4590 }
4591
4592 pub fn stage_entries(
4593 &mut self,
4594 entries: Vec<RepoPath>,
4595 cx: &mut Context<Self>,
4596 ) -> Task<anyhow::Result<()>> {
4597 self.stage_or_unstage_entries(true, entries, cx)
4598 }
4599
4600 pub fn unstage_entries(
4601 &mut self,
4602 entries: Vec<RepoPath>,
4603 cx: &mut Context<Self>,
4604 ) -> Task<anyhow::Result<()>> {
4605 self.stage_or_unstage_entries(false, entries, cx)
4606 }
4607
4608 fn stage_or_unstage_entries(
4609 &mut self,
4610 stage: bool,
4611 entries: Vec<RepoPath>,
4612 cx: &mut Context<Self>,
4613 ) -> Task<anyhow::Result<()>> {
4614 if entries.is_empty() {
4615 return Task::ready(Ok(()));
4616 }
4617 let Some(git_store) = self.git_store.upgrade() else {
4618 return Task::ready(Ok(()));
4619 };
4620 let id = self.id;
4621 let save_tasks = self.save_buffers(&entries, cx);
4622 let paths = entries
4623 .iter()
4624 .map(|p| p.as_unix_str())
4625 .collect::<Vec<_>>()
4626 .join(" ");
4627 let status = if stage {
4628 format!("git add {paths}")
4629 } else {
4630 format!("git reset {paths}")
4631 };
4632 let job_key = GitJobKey::WriteIndex(entries.clone());
4633
4634 self.spawn_job_with_tracking(
4635 entries.clone(),
4636 if stage {
4637 pending_op::GitStatus::Staged
4638 } else {
4639 pending_op::GitStatus::Unstaged
4640 },
4641 cx,
4642 async move |this, cx| {
4643 for save_task in save_tasks {
4644 save_task.await?;
4645 }
4646
4647 this.update(cx, |this, cx| {
4648 let weak_this = cx.weak_entity();
4649 this.send_keyed_job(
4650 Some(job_key),
4651 Some(status.into()),
4652 move |git_repo, mut cx| async move {
4653 let hunk_staging_operation_counts = weak_this
4654 .update(&mut cx, |this, cx| {
4655 let mut hunk_staging_operation_counts = HashMap::default();
4656 for path in &entries {
4657 let Some(project_path) =
4658 this.repo_path_to_project_path(path, cx)
4659 else {
4660 continue;
4661 };
4662 let Some(buffer) = git_store
4663 .read(cx)
4664 .buffer_store
4665 .read(cx)
4666 .get_by_path(&project_path)
4667 else {
4668 continue;
4669 };
4670 let Some(diff_state) = git_store
4671 .read(cx)
4672 .diffs
4673 .get(&buffer.read(cx).remote_id())
4674 .cloned()
4675 else {
4676 continue;
4677 };
4678 let Some(uncommitted_diff) =
4679 diff_state.read(cx).uncommitted_diff.as_ref().and_then(
4680 |uncommitted_diff| uncommitted_diff.upgrade(),
4681 )
4682 else {
4683 continue;
4684 };
4685 let buffer_snapshot = buffer.read(cx).text_snapshot();
4686 let file_exists = buffer
4687 .read(cx)
4688 .file()
4689 .is_some_and(|file| file.disk_state().exists());
4690 let hunk_staging_operation_count =
4691 diff_state.update(cx, |diff_state, cx| {
4692 uncommitted_diff.update(
4693 cx,
4694 |uncommitted_diff, cx| {
4695 uncommitted_diff
4696 .stage_or_unstage_all_hunks(
4697 stage,
4698 &buffer_snapshot,
4699 file_exists,
4700 cx,
4701 );
4702 },
4703 );
4704
4705 diff_state.hunk_staging_operation_count += 1;
4706 diff_state.hunk_staging_operation_count
4707 });
4708 hunk_staging_operation_counts.insert(
4709 diff_state.downgrade(),
4710 hunk_staging_operation_count,
4711 );
4712 }
4713 hunk_staging_operation_counts
4714 })
4715 .unwrap_or_default();
4716
4717 let result = match git_repo {
4718 RepositoryState::Local(LocalRepositoryState {
4719 backend,
4720 environment,
4721 ..
4722 }) => {
4723 if stage {
4724 backend.stage_paths(entries, environment.clone()).await
4725 } else {
4726 backend.unstage_paths(entries, environment.clone()).await
4727 }
4728 }
4729 RepositoryState::Remote(RemoteRepositoryState {
4730 project_id,
4731 client,
4732 }) => {
4733 if stage {
4734 client
4735 .request(proto::Stage {
4736 project_id: project_id.0,
4737 repository_id: id.to_proto(),
4738 paths: entries
4739 .into_iter()
4740 .map(|repo_path| repo_path.to_proto())
4741 .collect(),
4742 })
4743 .await
4744 .context("sending stage request")
4745 .map(|_| ())
4746 } else {
4747 client
4748 .request(proto::Unstage {
4749 project_id: project_id.0,
4750 repository_id: id.to_proto(),
4751 paths: entries
4752 .into_iter()
4753 .map(|repo_path| repo_path.to_proto())
4754 .collect(),
4755 })
4756 .await
4757 .context("sending unstage request")
4758 .map(|_| ())
4759 }
4760 }
4761 };
4762
4763 for (diff_state, hunk_staging_operation_count) in
4764 hunk_staging_operation_counts
4765 {
4766 diff_state
4767 .update(&mut cx, |diff_state, cx| {
4768 if result.is_ok() {
4769 diff_state.hunk_staging_operation_count_as_of_write =
4770 hunk_staging_operation_count;
4771 } else if let Some(uncommitted_diff) =
4772 &diff_state.uncommitted_diff
4773 {
4774 uncommitted_diff
4775 .update(cx, |uncommitted_diff, cx| {
4776 uncommitted_diff.clear_pending_hunks(cx);
4777 })
4778 .ok();
4779 }
4780 })
4781 .ok();
4782 }
4783
4784 result
4785 },
4786 )
4787 })?
4788 .await?
4789 },
4790 )
4791 }
4792
4793 pub fn stage_all(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
4794 let to_stage = self
4795 .cached_status()
4796 .filter_map(|entry| {
4797 if let Some(ops) = self.pending_ops_for_path(&entry.repo_path) {
4798 if ops.staging() || ops.staged() {
4799 None
4800 } else {
4801 Some(entry.repo_path)
4802 }
4803 } else if entry.status.staging().is_fully_staged() {
4804 None
4805 } else {
4806 Some(entry.repo_path)
4807 }
4808 })
4809 .collect();
4810 self.stage_or_unstage_entries(true, to_stage, cx)
4811 }
4812
4813 pub fn unstage_all(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
4814 let to_unstage = self
4815 .cached_status()
4816 .filter_map(|entry| {
4817 if let Some(ops) = self.pending_ops_for_path(&entry.repo_path) {
4818 if !ops.staging() && !ops.staged() {
4819 None
4820 } else {
4821 Some(entry.repo_path)
4822 }
4823 } else if entry.status.staging().is_fully_unstaged() {
4824 None
4825 } else {
4826 Some(entry.repo_path)
4827 }
4828 })
4829 .collect();
4830 self.stage_or_unstage_entries(false, to_unstage, cx)
4831 }
4832
4833 pub fn stash_all(&mut self, cx: &mut Context<Self>) -> Task<anyhow::Result<()>> {
4834 let to_stash = self.cached_status().map(|entry| entry.repo_path).collect();
4835
4836 self.stash_entries(to_stash, cx)
4837 }
4838
4839 pub fn stash_entries(
4840 &mut self,
4841 entries: Vec<RepoPath>,
4842 cx: &mut Context<Self>,
4843 ) -> Task<anyhow::Result<()>> {
4844 let id = self.id;
4845
4846 cx.spawn(async move |this, cx| {
4847 this.update(cx, |this, _| {
4848 this.send_job(None, move |git_repo, _cx| async move {
4849 match git_repo {
4850 RepositoryState::Local(LocalRepositoryState {
4851 backend,
4852 environment,
4853 ..
4854 }) => backend.stash_paths(entries, environment).await,
4855 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4856 client
4857 .request(proto::Stash {
4858 project_id: project_id.0,
4859 repository_id: id.to_proto(),
4860 paths: entries
4861 .into_iter()
4862 .map(|repo_path| repo_path.to_proto())
4863 .collect(),
4864 })
4865 .await
4866 .context("sending stash request")?;
4867 Ok(())
4868 }
4869 }
4870 })
4871 })?
4872 .await??;
4873 Ok(())
4874 })
4875 }
4876
4877 pub fn stash_pop(
4878 &mut self,
4879 index: Option<usize>,
4880 cx: &mut Context<Self>,
4881 ) -> Task<anyhow::Result<()>> {
4882 let id = self.id;
4883 cx.spawn(async move |this, cx| {
4884 this.update(cx, |this, _| {
4885 this.send_job(None, move |git_repo, _cx| async move {
4886 match git_repo {
4887 RepositoryState::Local(LocalRepositoryState {
4888 backend,
4889 environment,
4890 ..
4891 }) => backend.stash_pop(index, environment).await,
4892 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4893 client
4894 .request(proto::StashPop {
4895 project_id: project_id.0,
4896 repository_id: id.to_proto(),
4897 stash_index: index.map(|i| i as u64),
4898 })
4899 .await
4900 .context("sending stash pop request")?;
4901 Ok(())
4902 }
4903 }
4904 })
4905 })?
4906 .await??;
4907 Ok(())
4908 })
4909 }
4910
4911 pub fn stash_apply(
4912 &mut self,
4913 index: Option<usize>,
4914 cx: &mut Context<Self>,
4915 ) -> Task<anyhow::Result<()>> {
4916 let id = self.id;
4917 cx.spawn(async move |this, cx| {
4918 this.update(cx, |this, _| {
4919 this.send_job(None, move |git_repo, _cx| async move {
4920 match git_repo {
4921 RepositoryState::Local(LocalRepositoryState {
4922 backend,
4923 environment,
4924 ..
4925 }) => backend.stash_apply(index, environment).await,
4926 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4927 client
4928 .request(proto::StashApply {
4929 project_id: project_id.0,
4930 repository_id: id.to_proto(),
4931 stash_index: index.map(|i| i as u64),
4932 })
4933 .await
4934 .context("sending stash apply request")?;
4935 Ok(())
4936 }
4937 }
4938 })
4939 })?
4940 .await??;
4941 Ok(())
4942 })
4943 }
4944
4945 pub fn stash_drop(
4946 &mut self,
4947 index: Option<usize>,
4948 cx: &mut Context<Self>,
4949 ) -> oneshot::Receiver<anyhow::Result<()>> {
4950 let id = self.id;
4951 let updates_tx = self
4952 .git_store()
4953 .and_then(|git_store| match &git_store.read(cx).state {
4954 GitStoreState::Local { downstream, .. } => downstream
4955 .as_ref()
4956 .map(|downstream| downstream.updates_tx.clone()),
4957 _ => None,
4958 });
4959 let this = cx.weak_entity();
4960 self.send_job(None, move |git_repo, mut cx| async move {
4961 match git_repo {
4962 RepositoryState::Local(LocalRepositoryState {
4963 backend,
4964 environment,
4965 ..
4966 }) => {
4967 // TODO would be nice to not have to do this manually
4968 let result = backend.stash_drop(index, environment).await;
4969 if result.is_ok()
4970 && let Ok(stash_entries) = backend.stash_entries().await
4971 {
4972 let snapshot = this.update(&mut cx, |this, cx| {
4973 this.snapshot.stash_entries = stash_entries;
4974 cx.emit(RepositoryEvent::StashEntriesChanged);
4975 this.snapshot.clone()
4976 })?;
4977 if let Some(updates_tx) = updates_tx {
4978 updates_tx
4979 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
4980 .ok();
4981 }
4982 }
4983
4984 result
4985 }
4986 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
4987 client
4988 .request(proto::StashDrop {
4989 project_id: project_id.0,
4990 repository_id: id.to_proto(),
4991 stash_index: index.map(|i| i as u64),
4992 })
4993 .await
4994 .context("sending stash pop request")?;
4995 Ok(())
4996 }
4997 }
4998 })
4999 }
5000
5001 pub fn run_hook(&mut self, hook: RunHook, _cx: &mut App) -> oneshot::Receiver<Result<()>> {
5002 let id = self.id;
5003 self.send_job(
5004 Some(format!("git hook {}", hook.as_str()).into()),
5005 move |git_repo, _cx| async move {
5006 match git_repo {
5007 RepositoryState::Local(LocalRepositoryState {
5008 backend,
5009 environment,
5010 ..
5011 }) => backend.run_hook(hook, environment.clone()).await,
5012 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5013 client
5014 .request(proto::RunGitHook {
5015 project_id: project_id.0,
5016 repository_id: id.to_proto(),
5017 hook: hook.to_proto(),
5018 })
5019 .await?;
5020
5021 Ok(())
5022 }
5023 }
5024 },
5025 )
5026 }
5027
5028 pub fn commit(
5029 &mut self,
5030 message: SharedString,
5031 name_and_email: Option<(SharedString, SharedString)>,
5032 options: CommitOptions,
5033 askpass: AskPassDelegate,
5034 cx: &mut App,
5035 ) -> oneshot::Receiver<Result<()>> {
5036 let id = self.id;
5037 let askpass_delegates = self.askpass_delegates.clone();
5038 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
5039
5040 let rx = self.run_hook(RunHook::PreCommit, cx);
5041
5042 self.send_job(Some("git commit".into()), move |git_repo, _cx| async move {
5043 rx.await??;
5044
5045 match git_repo {
5046 RepositoryState::Local(LocalRepositoryState {
5047 backend,
5048 environment,
5049 ..
5050 }) => {
5051 backend
5052 .commit(message, name_and_email, options, askpass, environment)
5053 .await
5054 }
5055 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5056 askpass_delegates.lock().insert(askpass_id, askpass);
5057 let _defer = util::defer(|| {
5058 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
5059 debug_assert!(askpass_delegate.is_some());
5060 });
5061 let (name, email) = name_and_email.unzip();
5062 client
5063 .request(proto::Commit {
5064 project_id: project_id.0,
5065 repository_id: id.to_proto(),
5066 message: String::from(message),
5067 name: name.map(String::from),
5068 email: email.map(String::from),
5069 options: Some(proto::commit::CommitOptions {
5070 amend: options.amend,
5071 signoff: options.signoff,
5072 }),
5073 askpass_id,
5074 })
5075 .await
5076 .context("sending commit request")?;
5077
5078 Ok(())
5079 }
5080 }
5081 })
5082 }
5083
5084 pub fn fetch(
5085 &mut self,
5086 fetch_options: FetchOptions,
5087 askpass: AskPassDelegate,
5088 _cx: &mut App,
5089 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
5090 let askpass_delegates = self.askpass_delegates.clone();
5091 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
5092 let id = self.id;
5093
5094 self.send_job(Some("git fetch".into()), move |git_repo, cx| async move {
5095 match git_repo {
5096 RepositoryState::Local(LocalRepositoryState {
5097 backend,
5098 environment,
5099 ..
5100 }) => backend.fetch(fetch_options, askpass, environment, cx).await,
5101 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5102 askpass_delegates.lock().insert(askpass_id, askpass);
5103 let _defer = util::defer(|| {
5104 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
5105 debug_assert!(askpass_delegate.is_some());
5106 });
5107
5108 let response = client
5109 .request(proto::Fetch {
5110 project_id: project_id.0,
5111 repository_id: id.to_proto(),
5112 askpass_id,
5113 remote: fetch_options.to_proto(),
5114 })
5115 .await
5116 .context("sending fetch request")?;
5117
5118 Ok(RemoteCommandOutput {
5119 stdout: response.stdout,
5120 stderr: response.stderr,
5121 })
5122 }
5123 }
5124 })
5125 }
5126
5127 pub fn push(
5128 &mut self,
5129 branch: SharedString,
5130 remote_branch: SharedString,
5131 remote: SharedString,
5132 options: Option<PushOptions>,
5133 askpass: AskPassDelegate,
5134 cx: &mut Context<Self>,
5135 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
5136 let askpass_delegates = self.askpass_delegates.clone();
5137 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
5138 let id = self.id;
5139
5140 let args = options
5141 .map(|option| match option {
5142 PushOptions::SetUpstream => " --set-upstream",
5143 PushOptions::Force => " --force-with-lease",
5144 })
5145 .unwrap_or("");
5146
5147 let updates_tx = self
5148 .git_store()
5149 .and_then(|git_store| match &git_store.read(cx).state {
5150 GitStoreState::Local { downstream, .. } => downstream
5151 .as_ref()
5152 .map(|downstream| downstream.updates_tx.clone()),
5153 _ => None,
5154 });
5155
5156 let this = cx.weak_entity();
5157 self.send_job(
5158 Some(format!("git push {} {} {}:{}", args, remote, branch, remote_branch).into()),
5159 move |git_repo, mut cx| async move {
5160 match git_repo {
5161 RepositoryState::Local(LocalRepositoryState {
5162 backend,
5163 environment,
5164 ..
5165 }) => {
5166 let result = backend
5167 .push(
5168 branch.to_string(),
5169 remote_branch.to_string(),
5170 remote.to_string(),
5171 options,
5172 askpass,
5173 environment.clone(),
5174 cx.clone(),
5175 )
5176 .await;
5177 // TODO would be nice to not have to do this manually
5178 if result.is_ok() {
5179 let branches = backend.branches().await?;
5180 let branch = branches.into_iter().find(|branch| branch.is_head);
5181 log::info!("head branch after scan is {branch:?}");
5182 let snapshot = this.update(&mut cx, |this, cx| {
5183 this.snapshot.branch = branch;
5184 cx.emit(RepositoryEvent::BranchChanged);
5185 this.snapshot.clone()
5186 })?;
5187 if let Some(updates_tx) = updates_tx {
5188 updates_tx
5189 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
5190 .ok();
5191 }
5192 }
5193 result
5194 }
5195 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5196 askpass_delegates.lock().insert(askpass_id, askpass);
5197 let _defer = util::defer(|| {
5198 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
5199 debug_assert!(askpass_delegate.is_some());
5200 });
5201 let response = client
5202 .request(proto::Push {
5203 project_id: project_id.0,
5204 repository_id: id.to_proto(),
5205 askpass_id,
5206 branch_name: branch.to_string(),
5207 remote_branch_name: remote_branch.to_string(),
5208 remote_name: remote.to_string(),
5209 options: options.map(|options| match options {
5210 PushOptions::Force => proto::push::PushOptions::Force,
5211 PushOptions::SetUpstream => {
5212 proto::push::PushOptions::SetUpstream
5213 }
5214 }
5215 as i32),
5216 })
5217 .await
5218 .context("sending push request")?;
5219
5220 Ok(RemoteCommandOutput {
5221 stdout: response.stdout,
5222 stderr: response.stderr,
5223 })
5224 }
5225 }
5226 },
5227 )
5228 }
5229
5230 pub fn pull(
5231 &mut self,
5232 branch: Option<SharedString>,
5233 remote: SharedString,
5234 rebase: bool,
5235 askpass: AskPassDelegate,
5236 _cx: &mut App,
5237 ) -> oneshot::Receiver<Result<RemoteCommandOutput>> {
5238 let askpass_delegates = self.askpass_delegates.clone();
5239 let askpass_id = util::post_inc(&mut self.latest_askpass_id);
5240 let id = self.id;
5241
5242 let mut status = "git pull".to_string();
5243 if rebase {
5244 status.push_str(" --rebase");
5245 }
5246 status.push_str(&format!(" {}", remote));
5247 if let Some(b) = &branch {
5248 status.push_str(&format!(" {}", b));
5249 }
5250
5251 self.send_job(Some(status.into()), move |git_repo, cx| async move {
5252 match git_repo {
5253 RepositoryState::Local(LocalRepositoryState {
5254 backend,
5255 environment,
5256 ..
5257 }) => {
5258 backend
5259 .pull(
5260 branch.as_ref().map(|b| b.to_string()),
5261 remote.to_string(),
5262 rebase,
5263 askpass,
5264 environment.clone(),
5265 cx,
5266 )
5267 .await
5268 }
5269 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5270 askpass_delegates.lock().insert(askpass_id, askpass);
5271 let _defer = util::defer(|| {
5272 let askpass_delegate = askpass_delegates.lock().remove(&askpass_id);
5273 debug_assert!(askpass_delegate.is_some());
5274 });
5275 let response = client
5276 .request(proto::Pull {
5277 project_id: project_id.0,
5278 repository_id: id.to_proto(),
5279 askpass_id,
5280 rebase,
5281 branch_name: branch.as_ref().map(|b| b.to_string()),
5282 remote_name: remote.to_string(),
5283 })
5284 .await
5285 .context("sending pull request")?;
5286
5287 Ok(RemoteCommandOutput {
5288 stdout: response.stdout,
5289 stderr: response.stderr,
5290 })
5291 }
5292 }
5293 })
5294 }
5295
5296 fn spawn_set_index_text_job(
5297 &mut self,
5298 path: RepoPath,
5299 content: Option<String>,
5300 hunk_staging_operation_count: Option<usize>,
5301 cx: &mut Context<Self>,
5302 ) -> oneshot::Receiver<anyhow::Result<()>> {
5303 let id = self.id;
5304 let this = cx.weak_entity();
5305 let git_store = self.git_store.clone();
5306 let abs_path = self.snapshot.repo_path_to_abs_path(&path);
5307 self.send_keyed_job(
5308 Some(GitJobKey::WriteIndex(vec![path.clone()])),
5309 None,
5310 move |git_repo, mut cx| async move {
5311 log::debug!(
5312 "start updating index text for buffer {}",
5313 path.as_unix_str()
5314 );
5315
5316 match git_repo {
5317 RepositoryState::Local(LocalRepositoryState {
5318 fs,
5319 backend,
5320 environment,
5321 ..
5322 }) => {
5323 let executable = match fs.metadata(&abs_path).await {
5324 Ok(Some(meta)) => meta.is_executable,
5325 Ok(None) => false,
5326 Err(_err) => false,
5327 };
5328 backend
5329 .set_index_text(path.clone(), content, environment.clone(), executable)
5330 .await?;
5331 }
5332 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5333 client
5334 .request(proto::SetIndexText {
5335 project_id: project_id.0,
5336 repository_id: id.to_proto(),
5337 path: path.to_proto(),
5338 text: content,
5339 })
5340 .await?;
5341 }
5342 }
5343 log::debug!(
5344 "finish updating index text for buffer {}",
5345 path.as_unix_str()
5346 );
5347
5348 if let Some(hunk_staging_operation_count) = hunk_staging_operation_count {
5349 let project_path = this
5350 .read_with(&cx, |this, cx| this.repo_path_to_project_path(&path, cx))
5351 .ok()
5352 .flatten();
5353 git_store
5354 .update(&mut cx, |git_store, cx| {
5355 let buffer_id = git_store
5356 .buffer_store
5357 .read(cx)
5358 .get_by_path(&project_path?)?
5359 .read(cx)
5360 .remote_id();
5361 let diff_state = git_store.diffs.get(&buffer_id)?;
5362 diff_state.update(cx, |diff_state, _| {
5363 diff_state.hunk_staging_operation_count_as_of_write =
5364 hunk_staging_operation_count;
5365 });
5366 Some(())
5367 })
5368 .context("Git store dropped")?;
5369 }
5370 Ok(())
5371 },
5372 )
5373 }
5374
5375 pub fn create_remote(
5376 &mut self,
5377 remote_name: String,
5378 remote_url: String,
5379 ) -> oneshot::Receiver<Result<()>> {
5380 let id = self.id;
5381 self.send_job(
5382 Some(format!("git remote add {remote_name} {remote_url}").into()),
5383 move |repo, _cx| async move {
5384 match repo {
5385 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5386 backend.create_remote(remote_name, remote_url).await
5387 }
5388 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5389 client
5390 .request(proto::GitCreateRemote {
5391 project_id: project_id.0,
5392 repository_id: id.to_proto(),
5393 remote_name,
5394 remote_url,
5395 })
5396 .await?;
5397
5398 Ok(())
5399 }
5400 }
5401 },
5402 )
5403 }
5404
5405 pub fn remove_remote(&mut self, remote_name: String) -> oneshot::Receiver<Result<()>> {
5406 let id = self.id;
5407 self.send_job(
5408 Some(format!("git remove remote {remote_name}").into()),
5409 move |repo, _cx| async move {
5410 match repo {
5411 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5412 backend.remove_remote(remote_name).await
5413 }
5414 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5415 client
5416 .request(proto::GitRemoveRemote {
5417 project_id: project_id.0,
5418 repository_id: id.to_proto(),
5419 remote_name,
5420 })
5421 .await?;
5422
5423 Ok(())
5424 }
5425 }
5426 },
5427 )
5428 }
5429
5430 pub fn get_remotes(
5431 &mut self,
5432 branch_name: Option<String>,
5433 is_push: bool,
5434 ) -> oneshot::Receiver<Result<Vec<Remote>>> {
5435 let id = self.id;
5436 self.send_job(None, move |repo, _cx| async move {
5437 match repo {
5438 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5439 let remote = if let Some(branch_name) = branch_name {
5440 if is_push {
5441 backend.get_push_remote(branch_name).await?
5442 } else {
5443 backend.get_branch_remote(branch_name).await?
5444 }
5445 } else {
5446 None
5447 };
5448
5449 match remote {
5450 Some(remote) => Ok(vec![remote]),
5451 None => backend.get_all_remotes().await,
5452 }
5453 }
5454 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5455 let response = client
5456 .request(proto::GetRemotes {
5457 project_id: project_id.0,
5458 repository_id: id.to_proto(),
5459 branch_name,
5460 is_push,
5461 })
5462 .await?;
5463
5464 let remotes = response
5465 .remotes
5466 .into_iter()
5467 .map(|remotes| Remote {
5468 name: remotes.name.into(),
5469 })
5470 .collect();
5471
5472 Ok(remotes)
5473 }
5474 }
5475 })
5476 }
5477
5478 pub fn branches(&mut self) -> oneshot::Receiver<Result<Vec<Branch>>> {
5479 let id = self.id;
5480 self.send_job(None, move |repo, _| async move {
5481 match repo {
5482 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5483 backend.branches().await
5484 }
5485 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5486 let response = client
5487 .request(proto::GitGetBranches {
5488 project_id: project_id.0,
5489 repository_id: id.to_proto(),
5490 })
5491 .await?;
5492
5493 let branches = response
5494 .branches
5495 .into_iter()
5496 .map(|branch| proto_to_branch(&branch))
5497 .collect();
5498
5499 Ok(branches)
5500 }
5501 }
5502 })
5503 }
5504
5505 pub fn worktrees(&mut self) -> oneshot::Receiver<Result<Vec<GitWorktree>>> {
5506 let id = self.id;
5507 self.send_job(None, move |repo, _| async move {
5508 match repo {
5509 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5510 backend.worktrees().await
5511 }
5512 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5513 let response = client
5514 .request(proto::GitGetWorktrees {
5515 project_id: project_id.0,
5516 repository_id: id.to_proto(),
5517 })
5518 .await?;
5519
5520 let worktrees = response
5521 .worktrees
5522 .into_iter()
5523 .map(|worktree| proto_to_worktree(&worktree))
5524 .collect();
5525
5526 Ok(worktrees)
5527 }
5528 }
5529 })
5530 }
5531
5532 pub fn create_worktree(
5533 &mut self,
5534 name: String,
5535 path: PathBuf,
5536 commit: Option<String>,
5537 ) -> oneshot::Receiver<Result<()>> {
5538 let id = self.id;
5539 self.send_job(
5540 Some("git worktree add".into()),
5541 move |repo, _cx| async move {
5542 match repo {
5543 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5544 backend.create_worktree(name, path, commit).await
5545 }
5546 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5547 client
5548 .request(proto::GitCreateWorktree {
5549 project_id: project_id.0,
5550 repository_id: id.to_proto(),
5551 name,
5552 directory: path.to_string_lossy().to_string(),
5553 commit,
5554 })
5555 .await?;
5556
5557 Ok(())
5558 }
5559 }
5560 },
5561 )
5562 }
5563
5564 pub fn default_branch(
5565 &mut self,
5566 include_remote_name: bool,
5567 ) -> oneshot::Receiver<Result<Option<SharedString>>> {
5568 let id = self.id;
5569 self.send_job(None, move |repo, _| async move {
5570 match repo {
5571 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5572 backend.default_branch(include_remote_name).await
5573 }
5574 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5575 let response = client
5576 .request(proto::GetDefaultBranch {
5577 project_id: project_id.0,
5578 repository_id: id.to_proto(),
5579 })
5580 .await?;
5581
5582 anyhow::Ok(response.branch.map(SharedString::from))
5583 }
5584 }
5585 })
5586 }
5587
5588 pub fn diff_tree(
5589 &mut self,
5590 diff_type: DiffTreeType,
5591 _cx: &App,
5592 ) -> oneshot::Receiver<Result<TreeDiff>> {
5593 let repository_id = self.snapshot.id;
5594 self.send_job(None, move |repo, _cx| async move {
5595 match repo {
5596 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5597 backend.diff_tree(diff_type).await
5598 }
5599 RepositoryState::Remote(RemoteRepositoryState { client, project_id }) => {
5600 let response = client
5601 .request(proto::GetTreeDiff {
5602 project_id: project_id.0,
5603 repository_id: repository_id.0,
5604 is_merge: matches!(diff_type, DiffTreeType::MergeBase { .. }),
5605 base: diff_type.base().to_string(),
5606 head: diff_type.head().to_string(),
5607 })
5608 .await?;
5609
5610 let entries = response
5611 .entries
5612 .into_iter()
5613 .filter_map(|entry| {
5614 let status = match entry.status() {
5615 proto::tree_diff_status::Status::Added => TreeDiffStatus::Added,
5616 proto::tree_diff_status::Status::Modified => {
5617 TreeDiffStatus::Modified {
5618 old: git::Oid::from_str(
5619 &entry.oid.context("missing oid").log_err()?,
5620 )
5621 .log_err()?,
5622 }
5623 }
5624 proto::tree_diff_status::Status::Deleted => {
5625 TreeDiffStatus::Deleted {
5626 old: git::Oid::from_str(
5627 &entry.oid.context("missing oid").log_err()?,
5628 )
5629 .log_err()?,
5630 }
5631 }
5632 };
5633 Some((
5634 RepoPath::from_rel_path(
5635 &RelPath::from_proto(&entry.path).log_err()?,
5636 ),
5637 status,
5638 ))
5639 })
5640 .collect();
5641
5642 Ok(TreeDiff { entries })
5643 }
5644 }
5645 })
5646 }
5647
5648 pub fn diff(&mut self, diff_type: DiffType, _cx: &App) -> oneshot::Receiver<Result<String>> {
5649 let id = self.id;
5650 self.send_job(None, move |repo, _cx| async move {
5651 match repo {
5652 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5653 backend.diff(diff_type).await
5654 }
5655 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5656 let response = client
5657 .request(proto::GitDiff {
5658 project_id: project_id.0,
5659 repository_id: id.to_proto(),
5660 diff_type: match diff_type {
5661 DiffType::HeadToIndex => {
5662 proto::git_diff::DiffType::HeadToIndex.into()
5663 }
5664 DiffType::HeadToWorktree => {
5665 proto::git_diff::DiffType::HeadToWorktree.into()
5666 }
5667 },
5668 })
5669 .await?;
5670
5671 Ok(response.diff)
5672 }
5673 }
5674 })
5675 }
5676
5677 pub fn create_branch(
5678 &mut self,
5679 branch_name: String,
5680 base_branch: Option<String>,
5681 ) -> oneshot::Receiver<Result<()>> {
5682 let id = self.id;
5683 let status_msg = if let Some(ref base) = base_branch {
5684 format!("git switch -c {branch_name} {base}").into()
5685 } else {
5686 format!("git switch -c {branch_name}").into()
5687 };
5688 self.send_job(Some(status_msg), move |repo, _cx| async move {
5689 match repo {
5690 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5691 backend.create_branch(branch_name, base_branch).await
5692 }
5693 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5694 client
5695 .request(proto::GitCreateBranch {
5696 project_id: project_id.0,
5697 repository_id: id.to_proto(),
5698 branch_name,
5699 })
5700 .await?;
5701
5702 Ok(())
5703 }
5704 }
5705 })
5706 }
5707
5708 pub fn change_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
5709 let id = self.id;
5710 self.send_job(
5711 Some(format!("git switch {branch_name}").into()),
5712 move |repo, _cx| async move {
5713 match repo {
5714 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5715 backend.change_branch(branch_name).await
5716 }
5717 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5718 client
5719 .request(proto::GitChangeBranch {
5720 project_id: project_id.0,
5721 repository_id: id.to_proto(),
5722 branch_name,
5723 })
5724 .await?;
5725
5726 Ok(())
5727 }
5728 }
5729 },
5730 )
5731 }
5732
5733 pub fn delete_branch(&mut self, branch_name: String) -> oneshot::Receiver<Result<()>> {
5734 let id = self.id;
5735 self.send_job(
5736 Some(format!("git branch -d {branch_name}").into()),
5737 move |repo, _cx| async move {
5738 match repo {
5739 RepositoryState::Local(state) => state.backend.delete_branch(branch_name).await,
5740 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5741 client
5742 .request(proto::GitDeleteBranch {
5743 project_id: project_id.0,
5744 repository_id: id.to_proto(),
5745 branch_name,
5746 })
5747 .await?;
5748
5749 Ok(())
5750 }
5751 }
5752 },
5753 )
5754 }
5755
5756 pub fn rename_branch(
5757 &mut self,
5758 branch: String,
5759 new_name: String,
5760 ) -> oneshot::Receiver<Result<()>> {
5761 let id = self.id;
5762 self.send_job(
5763 Some(format!("git branch -m {branch} {new_name}").into()),
5764 move |repo, _cx| async move {
5765 match repo {
5766 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5767 backend.rename_branch(branch, new_name).await
5768 }
5769 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5770 client
5771 .request(proto::GitRenameBranch {
5772 project_id: project_id.0,
5773 repository_id: id.to_proto(),
5774 branch,
5775 new_name,
5776 })
5777 .await?;
5778
5779 Ok(())
5780 }
5781 }
5782 },
5783 )
5784 }
5785
5786 pub fn check_for_pushed_commits(&mut self) -> oneshot::Receiver<Result<Vec<SharedString>>> {
5787 let id = self.id;
5788 self.send_job(None, move |repo, _cx| async move {
5789 match repo {
5790 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5791 backend.check_for_pushed_commit().await
5792 }
5793 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
5794 let response = client
5795 .request(proto::CheckForPushedCommits {
5796 project_id: project_id.0,
5797 repository_id: id.to_proto(),
5798 })
5799 .await?;
5800
5801 let branches = response.pushed_to.into_iter().map(Into::into).collect();
5802
5803 Ok(branches)
5804 }
5805 }
5806 })
5807 }
5808
5809 pub fn checkpoint(&mut self) -> oneshot::Receiver<Result<GitRepositoryCheckpoint>> {
5810 self.send_job(None, |repo, _cx| async move {
5811 match repo {
5812 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5813 backend.checkpoint().await
5814 }
5815 RepositoryState::Remote(..) => anyhow::bail!("not implemented yet"),
5816 }
5817 })
5818 }
5819
5820 pub fn restore_checkpoint(
5821 &mut self,
5822 checkpoint: GitRepositoryCheckpoint,
5823 ) -> oneshot::Receiver<Result<()>> {
5824 self.send_job(None, move |repo, _cx| async move {
5825 match repo {
5826 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5827 backend.restore_checkpoint(checkpoint).await
5828 }
5829 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
5830 }
5831 })
5832 }
5833
5834 pub(crate) fn apply_remote_update(
5835 &mut self,
5836 update: proto::UpdateRepository,
5837 cx: &mut Context<Self>,
5838 ) -> Result<()> {
5839 let conflicted_paths = TreeSet::from_ordered_entries(
5840 update
5841 .current_merge_conflicts
5842 .into_iter()
5843 .filter_map(|path| RepoPath::from_proto(&path).log_err()),
5844 );
5845 let new_branch = update.branch_summary.as_ref().map(proto_to_branch);
5846 let new_head_commit = update
5847 .head_commit_details
5848 .as_ref()
5849 .map(proto_to_commit_details);
5850 if self.snapshot.branch != new_branch || self.snapshot.head_commit != new_head_commit {
5851 cx.emit(RepositoryEvent::BranchChanged)
5852 }
5853 self.snapshot.branch = new_branch;
5854 self.snapshot.head_commit = new_head_commit;
5855
5856 self.snapshot.merge.conflicted_paths = conflicted_paths;
5857 self.snapshot.merge.message = update.merge_message.map(SharedString::from);
5858 let new_stash_entries = GitStash {
5859 entries: update
5860 .stash_entries
5861 .iter()
5862 .filter_map(|entry| proto_to_stash(entry).ok())
5863 .collect(),
5864 };
5865 if self.snapshot.stash_entries != new_stash_entries {
5866 cx.emit(RepositoryEvent::StashEntriesChanged)
5867 }
5868 self.snapshot.stash_entries = new_stash_entries;
5869 self.snapshot.remote_upstream_url = update.remote_upstream_url;
5870 self.snapshot.remote_origin_url = update.remote_origin_url;
5871
5872 let edits = update
5873 .removed_statuses
5874 .into_iter()
5875 .filter_map(|path| {
5876 Some(sum_tree::Edit::Remove(PathKey(
5877 RelPath::from_proto(&path).log_err()?,
5878 )))
5879 })
5880 .chain(
5881 update
5882 .updated_statuses
5883 .into_iter()
5884 .filter_map(|updated_status| {
5885 Some(sum_tree::Edit::Insert(updated_status.try_into().log_err()?))
5886 }),
5887 )
5888 .collect::<Vec<_>>();
5889 if !edits.is_empty() {
5890 cx.emit(RepositoryEvent::StatusesChanged);
5891 }
5892 self.snapshot.statuses_by_path.edit(edits, ());
5893 if update.is_last_update {
5894 self.snapshot.scan_id = update.scan_id;
5895 }
5896 self.clear_pending_ops(cx);
5897 Ok(())
5898 }
5899
5900 pub fn compare_checkpoints(
5901 &mut self,
5902 left: GitRepositoryCheckpoint,
5903 right: GitRepositoryCheckpoint,
5904 ) -> oneshot::Receiver<Result<bool>> {
5905 self.send_job(None, move |repo, _cx| async move {
5906 match repo {
5907 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5908 backend.compare_checkpoints(left, right).await
5909 }
5910 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
5911 }
5912 })
5913 }
5914
5915 pub fn diff_checkpoints(
5916 &mut self,
5917 base_checkpoint: GitRepositoryCheckpoint,
5918 target_checkpoint: GitRepositoryCheckpoint,
5919 ) -> oneshot::Receiver<Result<String>> {
5920 self.send_job(None, move |repo, _cx| async move {
5921 match repo {
5922 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
5923 backend
5924 .diff_checkpoints(base_checkpoint, target_checkpoint)
5925 .await
5926 }
5927 RepositoryState::Remote { .. } => anyhow::bail!("not implemented yet"),
5928 }
5929 })
5930 }
5931
5932 fn clear_pending_ops(&mut self, cx: &mut Context<Self>) {
5933 let updated = SumTree::from_iter(
5934 self.pending_ops.iter().filter_map(|ops| {
5935 let inner_ops: Vec<PendingOp> =
5936 ops.ops.iter().filter(|op| op.running()).cloned().collect();
5937 if inner_ops.is_empty() {
5938 None
5939 } else {
5940 Some(PendingOps {
5941 repo_path: ops.repo_path.clone(),
5942 ops: inner_ops,
5943 })
5944 }
5945 }),
5946 (),
5947 );
5948
5949 if updated != self.pending_ops {
5950 cx.emit(RepositoryEvent::PendingOpsChanged {
5951 pending_ops: self.pending_ops.clone(),
5952 })
5953 }
5954
5955 self.pending_ops = updated;
5956 }
5957
5958 fn schedule_scan(
5959 &mut self,
5960 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
5961 cx: &mut Context<Self>,
5962 ) {
5963 let this = cx.weak_entity();
5964 let _ = self.send_keyed_job(
5965 Some(GitJobKey::ReloadGitState),
5966 None,
5967 |state, mut cx| async move {
5968 log::debug!("run scheduled git status scan");
5969
5970 let Some(this) = this.upgrade() else {
5971 return Ok(());
5972 };
5973 let RepositoryState::Local(LocalRepositoryState { backend, .. }) = state else {
5974 bail!("not a local repository")
5975 };
5976 let (snapshot, events) = this
5977 .update(&mut cx, |this, _| {
5978 this.paths_needing_status_update.clear();
5979 compute_snapshot(
5980 this.id,
5981 this.work_directory_abs_path.clone(),
5982 this.snapshot.clone(),
5983 backend.clone(),
5984 )
5985 })
5986 .await?;
5987 this.update(&mut cx, |this, cx| {
5988 this.snapshot = snapshot.clone();
5989 this.clear_pending_ops(cx);
5990 for event in events {
5991 cx.emit(event);
5992 }
5993 });
5994 if let Some(updates_tx) = updates_tx {
5995 updates_tx
5996 .unbounded_send(DownstreamUpdate::UpdateRepository(snapshot))
5997 .ok();
5998 }
5999 Ok(())
6000 },
6001 );
6002 }
6003
6004 fn spawn_local_git_worker(
6005 state: Shared<Task<Result<LocalRepositoryState, String>>>,
6006 cx: &mut Context<Self>,
6007 ) -> mpsc::UnboundedSender<GitJob> {
6008 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
6009
6010 cx.spawn(async move |_, cx| {
6011 let state = state.await.map_err(|err| anyhow::anyhow!(err))?;
6012 if let Some(git_hosting_provider_registry) =
6013 cx.update(|cx| GitHostingProviderRegistry::try_global(cx))
6014 {
6015 git_hosting_providers::register_additional_providers(
6016 git_hosting_provider_registry,
6017 state.backend.clone(),
6018 )
6019 .await;
6020 }
6021 let state = RepositoryState::Local(state);
6022 let mut jobs = VecDeque::new();
6023 loop {
6024 while let Ok(Some(next_job)) = job_rx.try_next() {
6025 jobs.push_back(next_job);
6026 }
6027
6028 if let Some(job) = jobs.pop_front() {
6029 if let Some(current_key) = &job.key
6030 && jobs
6031 .iter()
6032 .any(|other_job| other_job.key.as_ref() == Some(current_key))
6033 {
6034 continue;
6035 }
6036 (job.job)(state.clone(), cx).await;
6037 } else if let Some(job) = job_rx.next().await {
6038 jobs.push_back(job);
6039 } else {
6040 break;
6041 }
6042 }
6043 anyhow::Ok(())
6044 })
6045 .detach_and_log_err(cx);
6046
6047 job_tx
6048 }
6049
6050 fn spawn_remote_git_worker(
6051 state: RemoteRepositoryState,
6052 cx: &mut Context<Self>,
6053 ) -> mpsc::UnboundedSender<GitJob> {
6054 let (job_tx, mut job_rx) = mpsc::unbounded::<GitJob>();
6055
6056 cx.spawn(async move |_, cx| {
6057 let state = RepositoryState::Remote(state);
6058 let mut jobs = VecDeque::new();
6059 loop {
6060 while let Ok(Some(next_job)) = job_rx.try_next() {
6061 jobs.push_back(next_job);
6062 }
6063
6064 if let Some(job) = jobs.pop_front() {
6065 if let Some(current_key) = &job.key
6066 && jobs
6067 .iter()
6068 .any(|other_job| other_job.key.as_ref() == Some(current_key))
6069 {
6070 continue;
6071 }
6072 (job.job)(state.clone(), cx).await;
6073 } else if let Some(job) = job_rx.next().await {
6074 jobs.push_back(job);
6075 } else {
6076 break;
6077 }
6078 }
6079 anyhow::Ok(())
6080 })
6081 .detach_and_log_err(cx);
6082
6083 job_tx
6084 }
6085
6086 fn load_staged_text(
6087 &mut self,
6088 buffer_id: BufferId,
6089 repo_path: RepoPath,
6090 cx: &App,
6091 ) -> Task<Result<Option<String>>> {
6092 let rx = self.send_job(None, move |state, _| async move {
6093 match state {
6094 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
6095 anyhow::Ok(backend.load_index_text(repo_path).await)
6096 }
6097 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
6098 let response = client
6099 .request(proto::OpenUnstagedDiff {
6100 project_id: project_id.to_proto(),
6101 buffer_id: buffer_id.to_proto(),
6102 })
6103 .await?;
6104 Ok(response.staged_text)
6105 }
6106 }
6107 });
6108 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
6109 }
6110
6111 fn load_committed_text(
6112 &mut self,
6113 buffer_id: BufferId,
6114 repo_path: RepoPath,
6115 cx: &App,
6116 ) -> Task<Result<DiffBasesChange>> {
6117 let rx = self.send_job(None, move |state, _| async move {
6118 match state {
6119 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
6120 let committed_text = backend.load_committed_text(repo_path.clone()).await;
6121 let staged_text = backend.load_index_text(repo_path).await;
6122 let diff_bases_change = if committed_text == staged_text {
6123 DiffBasesChange::SetBoth(committed_text)
6124 } else {
6125 DiffBasesChange::SetEach {
6126 index: staged_text,
6127 head: committed_text,
6128 }
6129 };
6130 anyhow::Ok(diff_bases_change)
6131 }
6132 RepositoryState::Remote(RemoteRepositoryState { project_id, client }) => {
6133 use proto::open_uncommitted_diff_response::Mode;
6134
6135 let response = client
6136 .request(proto::OpenUncommittedDiff {
6137 project_id: project_id.to_proto(),
6138 buffer_id: buffer_id.to_proto(),
6139 })
6140 .await?;
6141 let mode = Mode::from_i32(response.mode).context("Invalid mode")?;
6142 let bases = match mode {
6143 Mode::IndexMatchesHead => DiffBasesChange::SetBoth(response.committed_text),
6144 Mode::IndexAndHead => DiffBasesChange::SetEach {
6145 head: response.committed_text,
6146 index: response.staged_text,
6147 },
6148 };
6149 Ok(bases)
6150 }
6151 }
6152 });
6153
6154 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
6155 }
6156
6157 fn load_blob_content(&mut self, oid: Oid, cx: &App) -> Task<Result<String>> {
6158 let repository_id = self.snapshot.id;
6159 let rx = self.send_job(None, move |state, _| async move {
6160 match state {
6161 RepositoryState::Local(LocalRepositoryState { backend, .. }) => {
6162 backend.load_blob_content(oid).await
6163 }
6164 RepositoryState::Remote(RemoteRepositoryState { client, project_id }) => {
6165 let response = client
6166 .request(proto::GetBlobContent {
6167 project_id: project_id.to_proto(),
6168 repository_id: repository_id.0,
6169 oid: oid.to_string(),
6170 })
6171 .await?;
6172 Ok(response.content)
6173 }
6174 }
6175 });
6176 cx.spawn(|_: &mut AsyncApp| async move { rx.await? })
6177 }
6178
6179 fn paths_changed(
6180 &mut self,
6181 paths: Vec<RepoPath>,
6182 updates_tx: Option<mpsc::UnboundedSender<DownstreamUpdate>>,
6183 cx: &mut Context<Self>,
6184 ) {
6185 self.paths_needing_status_update.extend(paths);
6186
6187 let this = cx.weak_entity();
6188 let _ = self.send_keyed_job(
6189 Some(GitJobKey::RefreshStatuses),
6190 None,
6191 |state, mut cx| async move {
6192 let (prev_snapshot, mut changed_paths) = this.update(&mut cx, |this, _| {
6193 (
6194 this.snapshot.clone(),
6195 mem::take(&mut this.paths_needing_status_update),
6196 )
6197 })?;
6198 let RepositoryState::Local(LocalRepositoryState { backend, .. }) = state else {
6199 bail!("not a local repository")
6200 };
6201
6202 let paths = changed_paths.iter().cloned().collect::<Vec<_>>();
6203 if paths.is_empty() {
6204 return Ok(());
6205 }
6206 let statuses = backend.status(&paths).await?;
6207 let stash_entries = backend.stash_entries().await?;
6208
6209 let changed_path_statuses = cx
6210 .background_spawn(async move {
6211 let mut changed_path_statuses = Vec::new();
6212 let prev_statuses = prev_snapshot.statuses_by_path.clone();
6213 let mut cursor = prev_statuses.cursor::<PathProgress>(());
6214
6215 for (repo_path, status) in &*statuses.entries {
6216 changed_paths.remove(repo_path);
6217 if cursor.seek_forward(&PathTarget::Path(repo_path), Bias::Left)
6218 && cursor.item().is_some_and(|entry| entry.status == *status)
6219 {
6220 continue;
6221 }
6222
6223 changed_path_statuses.push(Edit::Insert(StatusEntry {
6224 repo_path: repo_path.clone(),
6225 status: *status,
6226 }));
6227 }
6228 let mut cursor = prev_statuses.cursor::<PathProgress>(());
6229 for path in changed_paths.into_iter() {
6230 if cursor.seek_forward(&PathTarget::Path(&path), Bias::Left) {
6231 changed_path_statuses
6232 .push(Edit::Remove(PathKey(path.as_ref().clone())));
6233 }
6234 }
6235 changed_path_statuses
6236 })
6237 .await;
6238
6239 this.update(&mut cx, |this, cx| {
6240 if this.snapshot.stash_entries != stash_entries {
6241 cx.emit(RepositoryEvent::StashEntriesChanged);
6242 this.snapshot.stash_entries = stash_entries;
6243 }
6244
6245 if !changed_path_statuses.is_empty() {
6246 cx.emit(RepositoryEvent::StatusesChanged);
6247 this.snapshot
6248 .statuses_by_path
6249 .edit(changed_path_statuses, ());
6250 this.snapshot.scan_id += 1;
6251 }
6252
6253 if let Some(updates_tx) = updates_tx {
6254 updates_tx
6255 .unbounded_send(DownstreamUpdate::UpdateRepository(
6256 this.snapshot.clone(),
6257 ))
6258 .ok();
6259 }
6260 })
6261 },
6262 );
6263 }
6264
6265 /// currently running git command and when it started
6266 pub fn current_job(&self) -> Option<JobInfo> {
6267 self.active_jobs.values().next().cloned()
6268 }
6269
6270 pub fn barrier(&mut self) -> oneshot::Receiver<()> {
6271 self.send_job(None, |_, _| async {})
6272 }
6273
6274 fn spawn_job_with_tracking<AsyncFn>(
6275 &mut self,
6276 paths: Vec<RepoPath>,
6277 git_status: pending_op::GitStatus,
6278 cx: &mut Context<Self>,
6279 f: AsyncFn,
6280 ) -> Task<Result<()>>
6281 where
6282 AsyncFn: AsyncFnOnce(WeakEntity<Repository>, &mut AsyncApp) -> Result<()> + 'static,
6283 {
6284 let ids = self.new_pending_ops_for_paths(paths, git_status);
6285
6286 cx.spawn(async move |this, cx| {
6287 let (job_status, result) = match f(this.clone(), cx).await {
6288 Ok(()) => (pending_op::JobStatus::Finished, Ok(())),
6289 Err(err) if err.is::<Canceled>() => (pending_op::JobStatus::Skipped, Ok(())),
6290 Err(err) => (pending_op::JobStatus::Error, Err(err)),
6291 };
6292
6293 this.update(cx, |this, _| {
6294 let mut edits = Vec::with_capacity(ids.len());
6295 for (id, entry) in ids {
6296 if let Some(mut ops) = this
6297 .pending_ops
6298 .get(&PathKey(entry.as_ref().clone()), ())
6299 .cloned()
6300 {
6301 if let Some(op) = ops.op_by_id_mut(id) {
6302 op.job_status = job_status;
6303 }
6304 edits.push(sum_tree::Edit::Insert(ops));
6305 }
6306 }
6307 this.pending_ops.edit(edits, ());
6308 })?;
6309
6310 result
6311 })
6312 }
6313
6314 fn new_pending_ops_for_paths(
6315 &mut self,
6316 paths: Vec<RepoPath>,
6317 git_status: pending_op::GitStatus,
6318 ) -> Vec<(PendingOpId, RepoPath)> {
6319 let mut edits = Vec::with_capacity(paths.len());
6320 let mut ids = Vec::with_capacity(paths.len());
6321 for path in paths {
6322 let mut ops = self
6323 .pending_ops
6324 .get(&PathKey(path.as_ref().clone()), ())
6325 .cloned()
6326 .unwrap_or_else(|| PendingOps::new(&path));
6327 let id = ops.max_id() + 1;
6328 ops.ops.push(PendingOp {
6329 id,
6330 git_status,
6331 job_status: pending_op::JobStatus::Running,
6332 });
6333 edits.push(sum_tree::Edit::Insert(ops));
6334 ids.push((id, path));
6335 }
6336 self.pending_ops.edit(edits, ());
6337 ids
6338 }
6339 pub fn default_remote_url(&self) -> Option<String> {
6340 self.remote_upstream_url
6341 .clone()
6342 .or(self.remote_origin_url.clone())
6343 }
6344}
6345
6346fn get_permalink_in_rust_registry_src(
6347 provider_registry: Arc<GitHostingProviderRegistry>,
6348 path: PathBuf,
6349 selection: Range<u32>,
6350) -> Result<url::Url> {
6351 #[derive(Deserialize)]
6352 struct CargoVcsGit {
6353 sha1: String,
6354 }
6355
6356 #[derive(Deserialize)]
6357 struct CargoVcsInfo {
6358 git: CargoVcsGit,
6359 path_in_vcs: String,
6360 }
6361
6362 #[derive(Deserialize)]
6363 struct CargoPackage {
6364 repository: String,
6365 }
6366
6367 #[derive(Deserialize)]
6368 struct CargoToml {
6369 package: CargoPackage,
6370 }
6371
6372 let Some((dir, cargo_vcs_info_json)) = path.ancestors().skip(1).find_map(|dir| {
6373 let json = std::fs::read_to_string(dir.join(".cargo_vcs_info.json")).ok()?;
6374 Some((dir, json))
6375 }) else {
6376 bail!("No .cargo_vcs_info.json found in parent directories")
6377 };
6378 let cargo_vcs_info = serde_json::from_str::<CargoVcsInfo>(&cargo_vcs_info_json)?;
6379 let cargo_toml = std::fs::read_to_string(dir.join("Cargo.toml"))?;
6380 let manifest = toml::from_str::<CargoToml>(&cargo_toml)?;
6381 let (provider, remote) = parse_git_remote_url(provider_registry, &manifest.package.repository)
6382 .context("parsing package.repository field of manifest")?;
6383 let path = PathBuf::from(cargo_vcs_info.path_in_vcs).join(path.strip_prefix(dir).unwrap());
6384 let permalink = provider.build_permalink(
6385 remote,
6386 BuildPermalinkParams::new(
6387 &cargo_vcs_info.git.sha1,
6388 &RepoPath::from_rel_path(
6389 &RelPath::new(&path, PathStyle::local()).context("invalid path")?,
6390 ),
6391 Some(selection),
6392 ),
6393 );
6394 Ok(permalink)
6395}
6396
6397fn serialize_blame_buffer_response(blame: Option<git::blame::Blame>) -> proto::BlameBufferResponse {
6398 let Some(blame) = blame else {
6399 return proto::BlameBufferResponse {
6400 blame_response: None,
6401 };
6402 };
6403
6404 let entries = blame
6405 .entries
6406 .into_iter()
6407 .map(|entry| proto::BlameEntry {
6408 sha: entry.sha.as_bytes().into(),
6409 start_line: entry.range.start,
6410 end_line: entry.range.end,
6411 original_line_number: entry.original_line_number,
6412 author: entry.author,
6413 author_mail: entry.author_mail,
6414 author_time: entry.author_time,
6415 author_tz: entry.author_tz,
6416 committer: entry.committer_name,
6417 committer_mail: entry.committer_email,
6418 committer_time: entry.committer_time,
6419 committer_tz: entry.committer_tz,
6420 summary: entry.summary,
6421 previous: entry.previous,
6422 filename: entry.filename,
6423 })
6424 .collect::<Vec<_>>();
6425
6426 let messages = blame
6427 .messages
6428 .into_iter()
6429 .map(|(oid, message)| proto::CommitMessage {
6430 oid: oid.as_bytes().into(),
6431 message,
6432 })
6433 .collect::<Vec<_>>();
6434
6435 proto::BlameBufferResponse {
6436 blame_response: Some(proto::blame_buffer_response::BlameResponse { entries, messages }),
6437 }
6438}
6439
6440fn deserialize_blame_buffer_response(
6441 response: proto::BlameBufferResponse,
6442) -> Option<git::blame::Blame> {
6443 let response = response.blame_response?;
6444 let entries = response
6445 .entries
6446 .into_iter()
6447 .filter_map(|entry| {
6448 Some(git::blame::BlameEntry {
6449 sha: git::Oid::from_bytes(&entry.sha).ok()?,
6450 range: entry.start_line..entry.end_line,
6451 original_line_number: entry.original_line_number,
6452 committer_name: entry.committer,
6453 committer_time: entry.committer_time,
6454 committer_tz: entry.committer_tz,
6455 committer_email: entry.committer_mail,
6456 author: entry.author,
6457 author_mail: entry.author_mail,
6458 author_time: entry.author_time,
6459 author_tz: entry.author_tz,
6460 summary: entry.summary,
6461 previous: entry.previous,
6462 filename: entry.filename,
6463 })
6464 })
6465 .collect::<Vec<_>>();
6466
6467 let messages = response
6468 .messages
6469 .into_iter()
6470 .filter_map(|message| Some((git::Oid::from_bytes(&message.oid).ok()?, message.message)))
6471 .collect::<HashMap<_, _>>();
6472
6473 Some(Blame { entries, messages })
6474}
6475
6476fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
6477 proto::Branch {
6478 is_head: branch.is_head,
6479 ref_name: branch.ref_name.to_string(),
6480 unix_timestamp: branch
6481 .most_recent_commit
6482 .as_ref()
6483 .map(|commit| commit.commit_timestamp as u64),
6484 upstream: branch.upstream.as_ref().map(|upstream| proto::GitUpstream {
6485 ref_name: upstream.ref_name.to_string(),
6486 tracking: upstream
6487 .tracking
6488 .status()
6489 .map(|upstream| proto::UpstreamTracking {
6490 ahead: upstream.ahead as u64,
6491 behind: upstream.behind as u64,
6492 }),
6493 }),
6494 most_recent_commit: branch
6495 .most_recent_commit
6496 .as_ref()
6497 .map(|commit| proto::CommitSummary {
6498 sha: commit.sha.to_string(),
6499 subject: commit.subject.to_string(),
6500 commit_timestamp: commit.commit_timestamp,
6501 author_name: commit.author_name.to_string(),
6502 }),
6503 }
6504}
6505
6506fn worktree_to_proto(worktree: &git::repository::Worktree) -> proto::Worktree {
6507 proto::Worktree {
6508 path: worktree.path.to_string_lossy().to_string(),
6509 ref_name: worktree.ref_name.to_string(),
6510 sha: worktree.sha.to_string(),
6511 }
6512}
6513
6514fn proto_to_worktree(proto: &proto::Worktree) -> git::repository::Worktree {
6515 git::repository::Worktree {
6516 path: PathBuf::from(proto.path.clone()),
6517 ref_name: proto.ref_name.clone().into(),
6518 sha: proto.sha.clone().into(),
6519 }
6520}
6521
6522fn proto_to_branch(proto: &proto::Branch) -> git::repository::Branch {
6523 git::repository::Branch {
6524 is_head: proto.is_head,
6525 ref_name: proto.ref_name.clone().into(),
6526 upstream: proto
6527 .upstream
6528 .as_ref()
6529 .map(|upstream| git::repository::Upstream {
6530 ref_name: upstream.ref_name.to_string().into(),
6531 tracking: upstream
6532 .tracking
6533 .as_ref()
6534 .map(|tracking| {
6535 git::repository::UpstreamTracking::Tracked(UpstreamTrackingStatus {
6536 ahead: tracking.ahead as u32,
6537 behind: tracking.behind as u32,
6538 })
6539 })
6540 .unwrap_or(git::repository::UpstreamTracking::Gone),
6541 }),
6542 most_recent_commit: proto.most_recent_commit.as_ref().map(|commit| {
6543 git::repository::CommitSummary {
6544 sha: commit.sha.to_string().into(),
6545 subject: commit.subject.to_string().into(),
6546 commit_timestamp: commit.commit_timestamp,
6547 author_name: commit.author_name.to_string().into(),
6548 has_parent: true,
6549 }
6550 }),
6551 }
6552}
6553
6554fn commit_details_to_proto(commit: &CommitDetails) -> proto::GitCommitDetails {
6555 proto::GitCommitDetails {
6556 sha: commit.sha.to_string(),
6557 message: commit.message.to_string(),
6558 commit_timestamp: commit.commit_timestamp,
6559 author_email: commit.author_email.to_string(),
6560 author_name: commit.author_name.to_string(),
6561 }
6562}
6563
6564fn proto_to_commit_details(proto: &proto::GitCommitDetails) -> CommitDetails {
6565 CommitDetails {
6566 sha: proto.sha.clone().into(),
6567 message: proto.message.clone().into(),
6568 commit_timestamp: proto.commit_timestamp,
6569 author_email: proto.author_email.clone().into(),
6570 author_name: proto.author_name.clone().into(),
6571 }
6572}
6573
6574async fn compute_snapshot(
6575 id: RepositoryId,
6576 work_directory_abs_path: Arc<Path>,
6577 prev_snapshot: RepositorySnapshot,
6578 backend: Arc<dyn GitRepository>,
6579) -> Result<(RepositorySnapshot, Vec<RepositoryEvent>)> {
6580 let mut events = Vec::new();
6581 let branches = backend.branches().await?;
6582 let branch = branches.into_iter().find(|branch| branch.is_head);
6583 let statuses = backend
6584 .status(&[RepoPath::from_rel_path(
6585 &RelPath::new(".".as_ref(), PathStyle::local()).unwrap(),
6586 )])
6587 .await?;
6588 let stash_entries = backend.stash_entries().await?;
6589 let statuses_by_path = SumTree::from_iter(
6590 statuses
6591 .entries
6592 .iter()
6593 .map(|(repo_path, status)| StatusEntry {
6594 repo_path: repo_path.clone(),
6595 status: *status,
6596 }),
6597 (),
6598 );
6599 let (merge_details, merge_heads_changed) =
6600 MergeDetails::load(&backend, &statuses_by_path, &prev_snapshot).await?;
6601 log::debug!("new merge details (changed={merge_heads_changed:?}): {merge_details:?}");
6602
6603 if merge_heads_changed {
6604 events.push(RepositoryEvent::MergeHeadsChanged);
6605 }
6606
6607 if statuses_by_path != prev_snapshot.statuses_by_path {
6608 events.push(RepositoryEvent::StatusesChanged)
6609 }
6610
6611 // Useful when branch is None in detached head state
6612 let head_commit = match backend.head_sha().await {
6613 Some(head_sha) => backend.show(head_sha).await.log_err(),
6614 None => None,
6615 };
6616
6617 if branch != prev_snapshot.branch || head_commit != prev_snapshot.head_commit {
6618 events.push(RepositoryEvent::BranchChanged);
6619 }
6620
6621 let remote_origin_url = backend.remote_url("origin").await;
6622 let remote_upstream_url = backend.remote_url("upstream").await;
6623
6624 let snapshot = RepositorySnapshot {
6625 id,
6626 statuses_by_path,
6627 work_directory_abs_path,
6628 path_style: prev_snapshot.path_style,
6629 scan_id: prev_snapshot.scan_id + 1,
6630 branch,
6631 head_commit,
6632 merge: merge_details,
6633 remote_origin_url,
6634 remote_upstream_url,
6635 stash_entries,
6636 };
6637
6638 Ok((snapshot, events))
6639}
6640
6641fn status_from_proto(
6642 simple_status: i32,
6643 status: Option<proto::GitFileStatus>,
6644) -> anyhow::Result<FileStatus> {
6645 use proto::git_file_status::Variant;
6646
6647 let Some(variant) = status.and_then(|status| status.variant) else {
6648 let code = proto::GitStatus::from_i32(simple_status)
6649 .with_context(|| format!("Invalid git status code: {simple_status}"))?;
6650 let result = match code {
6651 proto::GitStatus::Added => TrackedStatus {
6652 worktree_status: StatusCode::Added,
6653 index_status: StatusCode::Unmodified,
6654 }
6655 .into(),
6656 proto::GitStatus::Modified => TrackedStatus {
6657 worktree_status: StatusCode::Modified,
6658 index_status: StatusCode::Unmodified,
6659 }
6660 .into(),
6661 proto::GitStatus::Conflict => UnmergedStatus {
6662 first_head: UnmergedStatusCode::Updated,
6663 second_head: UnmergedStatusCode::Updated,
6664 }
6665 .into(),
6666 proto::GitStatus::Deleted => TrackedStatus {
6667 worktree_status: StatusCode::Deleted,
6668 index_status: StatusCode::Unmodified,
6669 }
6670 .into(),
6671 _ => anyhow::bail!("Invalid code for simple status: {simple_status}"),
6672 };
6673 return Ok(result);
6674 };
6675
6676 let result = match variant {
6677 Variant::Untracked(_) => FileStatus::Untracked,
6678 Variant::Ignored(_) => FileStatus::Ignored,
6679 Variant::Unmerged(unmerged) => {
6680 let [first_head, second_head] =
6681 [unmerged.first_head, unmerged.second_head].map(|head| {
6682 let code = proto::GitStatus::from_i32(head)
6683 .with_context(|| format!("Invalid git status code: {head}"))?;
6684 let result = match code {
6685 proto::GitStatus::Added => UnmergedStatusCode::Added,
6686 proto::GitStatus::Updated => UnmergedStatusCode::Updated,
6687 proto::GitStatus::Deleted => UnmergedStatusCode::Deleted,
6688 _ => anyhow::bail!("Invalid code for unmerged status: {code:?}"),
6689 };
6690 Ok(result)
6691 });
6692 let [first_head, second_head] = [first_head?, second_head?];
6693 UnmergedStatus {
6694 first_head,
6695 second_head,
6696 }
6697 .into()
6698 }
6699 Variant::Tracked(tracked) => {
6700 let [index_status, worktree_status] = [tracked.index_status, tracked.worktree_status]
6701 .map(|status| {
6702 let code = proto::GitStatus::from_i32(status)
6703 .with_context(|| format!("Invalid git status code: {status}"))?;
6704 let result = match code {
6705 proto::GitStatus::Modified => StatusCode::Modified,
6706 proto::GitStatus::TypeChanged => StatusCode::TypeChanged,
6707 proto::GitStatus::Added => StatusCode::Added,
6708 proto::GitStatus::Deleted => StatusCode::Deleted,
6709 proto::GitStatus::Renamed => StatusCode::Renamed,
6710 proto::GitStatus::Copied => StatusCode::Copied,
6711 proto::GitStatus::Unmodified => StatusCode::Unmodified,
6712 _ => anyhow::bail!("Invalid code for tracked status: {code:?}"),
6713 };
6714 Ok(result)
6715 });
6716 let [index_status, worktree_status] = [index_status?, worktree_status?];
6717 TrackedStatus {
6718 index_status,
6719 worktree_status,
6720 }
6721 .into()
6722 }
6723 };
6724 Ok(result)
6725}
6726
6727fn status_to_proto(status: FileStatus) -> proto::GitFileStatus {
6728 use proto::git_file_status::{Tracked, Unmerged, Variant};
6729
6730 let variant = match status {
6731 FileStatus::Untracked => Variant::Untracked(Default::default()),
6732 FileStatus::Ignored => Variant::Ignored(Default::default()),
6733 FileStatus::Unmerged(UnmergedStatus {
6734 first_head,
6735 second_head,
6736 }) => Variant::Unmerged(Unmerged {
6737 first_head: unmerged_status_to_proto(first_head),
6738 second_head: unmerged_status_to_proto(second_head),
6739 }),
6740 FileStatus::Tracked(TrackedStatus {
6741 index_status,
6742 worktree_status,
6743 }) => Variant::Tracked(Tracked {
6744 index_status: tracked_status_to_proto(index_status),
6745 worktree_status: tracked_status_to_proto(worktree_status),
6746 }),
6747 };
6748 proto::GitFileStatus {
6749 variant: Some(variant),
6750 }
6751}
6752
6753fn unmerged_status_to_proto(code: UnmergedStatusCode) -> i32 {
6754 match code {
6755 UnmergedStatusCode::Added => proto::GitStatus::Added as _,
6756 UnmergedStatusCode::Deleted => proto::GitStatus::Deleted as _,
6757 UnmergedStatusCode::Updated => proto::GitStatus::Updated as _,
6758 }
6759}
6760
6761fn tracked_status_to_proto(code: StatusCode) -> i32 {
6762 match code {
6763 StatusCode::Added => proto::GitStatus::Added as _,
6764 StatusCode::Deleted => proto::GitStatus::Deleted as _,
6765 StatusCode::Modified => proto::GitStatus::Modified as _,
6766 StatusCode::Renamed => proto::GitStatus::Renamed as _,
6767 StatusCode::TypeChanged => proto::GitStatus::TypeChanged as _,
6768 StatusCode::Copied => proto::GitStatus::Copied as _,
6769 StatusCode::Unmodified => proto::GitStatus::Unmodified as _,
6770 }
6771}