diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 986330e118de9eaf4abff99357d7d865ba302a94..1294b06c8e553f17fdc826f6bef07318ecd0fc9c 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -436,6 +436,7 @@ impl Server { .add_request_handler(forward_mutating_project_request::) .add_request_handler(forward_read_only_project_request::) .add_request_handler(forward_read_only_project_request::) + .add_request_handler(forward_read_only_project_request::) .add_request_handler(forward_mutating_project_request::) .add_request_handler(disallow_guest_request::) .add_request_handler(disallow_guest_request::) diff --git a/crates/collab/tests/integration/git_tests.rs b/crates/collab/tests/integration/git_tests.rs index b8248ce00214be868e6f1285027d2719b377747b..cfa142ab0055474c7b498629b3e5c4ede6371e39 100644 --- a/crates/collab/tests/integration/git_tests.rs +++ b/crates/collab/tests/integration/git_tests.rs @@ -4,12 +4,16 @@ use call::ActiveCall; use client::RECEIVE_TIMEOUT; use collections::HashMap; use git::{ - repository::{RepoPath, Worktree as GitWorktree}, + Oid, + repository::{CommitData, RepoPath, Worktree as GitWorktree}, status::{DiffStat, FileStatus, StatusCode, TrackedStatus}, }; use git_ui::{git_panel::GitPanel, project_diff::ProjectDiff}; -use gpui::{AppContext as _, BackgroundExecutor, TestAppContext, VisualTestContext}; -use project::ProjectPath; +use gpui::{AppContext as _, BackgroundExecutor, SharedString, TestAppContext, VisualTestContext}; +use project::{ + ProjectPath, + git_store::{CommitDataState, Repository}, +}; use serde_json::json; use util::{path, rel_path::rel_path}; @@ -91,6 +95,90 @@ fn collect_diff_stats( }) } +async fn load_commit_data_batch( + repository: &gpui::Entity, + shas: &[Oid], + executor: &BackgroundExecutor, + cx: &mut TestAppContext, +) -> HashMap { + let states = cx.update(|cx| { + shas.iter() + .map(|sha| { + ( + *sha, + repository.update(cx, |repository, cx| { + repository.fetch_commit_data(*sha, true, cx).clone() + }), + ) + }) + .collect::>() + }); + + executor.run_until_parked(); + + let mut commit_data = HashMap::default(); + for (sha, state) in states { + let data = match state { + CommitDataState::Loaded(data) => data.as_ref().clone(), + CommitDataState::Loading(Some(shared)) => shared.await.unwrap().as_ref().clone(), + CommitDataState::Loading(None) => { + panic!("fetch_commit_data(..., true) should return an await-result state") + } + }; + commit_data.insert(sha, data); + } + + commit_data +} + +fn assert_remote_cache_matches_local_cache( + local_repository: &gpui::Entity, + remote_repository: &gpui::Entity, + cx_local: &mut TestAppContext, + cx_remote: &mut TestAppContext, +) { + let local_cache = cx_local.update(|cx| { + local_repository.update(cx, |repository, _| repository.loaded_commit_data_for_test()) + }); + let remote_cache = cx_remote.update(|cx| { + remote_repository.update(cx, |repository, _| repository.loaded_commit_data_for_test()) + }); + + for (sha, remote_commit_data) in &remote_cache { + let local_commit_data = local_cache + .get(sha) + .unwrap_or_else(|| panic!("local cache missing commit data for {sha}")); + assert_eq!( + local_commit_data.sha, remote_commit_data.sha, + "local and remote cache should agree on sha for {sha}" + ); + assert_eq!( + local_commit_data.parents, remote_commit_data.parents, + "local and remote cache should agree on parents for {sha}" + ); + assert_eq!( + local_commit_data.author_name, remote_commit_data.author_name, + "local and remote cache should agree on author_name for {sha}" + ); + assert_eq!( + local_commit_data.author_email, remote_commit_data.author_email, + "local and remote cache should agree on author_email for {sha}" + ); + assert_eq!( + local_commit_data.commit_timestamp, remote_commit_data.commit_timestamp, + "local and remote cache should agree on commit_timestamp for {sha}" + ); + assert_eq!( + local_commit_data.subject, remote_commit_data.subject, + "local and remote cache should agree on subject for {sha}" + ); + assert_eq!( + local_commit_data.message, remote_commit_data.message, + "local and remote cache should agree on message for {sha}" + ); + } +} + #[gpui::test] async fn test_project_diff(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { let mut server = TestServer::start(cx_a.background_executor.clone()).await; @@ -480,6 +568,110 @@ async fn test_remote_git_head_sha( assert_eq!(remote_head_sha.unwrap(), local_head_sha); } +#[gpui::test] +async fn test_remote_git_commit_data_batches( + executor: BackgroundExecutor, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + let mut server = TestServer::start(executor.clone()).await; + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + server + .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + .await; + let active_call_a = cx_a.read(ActiveCall::global); + + client_a + .fs() + .insert_tree( + path!("/project"), + json!({ ".git": {}, "file.txt": "content" }), + ) + .await; + + let commit_shas = [ + "0123456789abcdef0123456789abcdef01234567" + .parse::() + .unwrap(), + "1111111111111111111111111111111111111111" + .parse::() + .unwrap(), + "2222222222222222222222222222222222222222" + .parse::() + .unwrap(), + "3333333333333333333333333333333333333333" + .parse::() + .unwrap(), + ]; + + client_a.fs().set_commit_data( + Path::new(path!("/project/.git")), + commit_shas.iter().enumerate().map(|(index, sha)| { + ( + CommitData { + sha: *sha, + parents: Default::default(), + author_name: SharedString::from(format!("Author {index}")), + author_email: SharedString::from(format!("author{index}@example.com")), + commit_timestamp: 1_700_000_000 + index as i64, + subject: SharedString::from(format!("Subject {index}")), + message: SharedString::from(format!("Subject {index}\n\nBody {index}")), + }, + false, + ) + }), + ); + + let (project_a, _) = client_a.build_local_project(path!("/project"), cx_a).await; + executor.run_until_parked(); + + let repo_a = cx_a.update(|cx| project_a.read(cx).active_repository(cx).unwrap()); + + let primed_before = load_commit_data_batch(&repo_a, &commit_shas[..2], &executor, cx_a).await; + assert_eq!( + primed_before.len(), + 2, + "host should prime two commits before sharing" + ); + + let project_id = active_call_a + .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx)) + .await + .unwrap(); + let project_b = client_b.join_remote_project(project_id, cx_b).await; + + executor.run_until_parked(); + + let repo_b = cx_b.update(|cx| project_b.read(cx).active_repository(cx).unwrap()); + + let remote_batch_one = + load_commit_data_batch(&repo_b, &commit_shas[..3], &executor, cx_b).await; + assert_eq!(remote_batch_one.len(), 3); + for (index, sha) in commit_shas[..3].iter().enumerate() { + let commit_data = remote_batch_one.get(sha).unwrap(); + assert_eq!(commit_data.sha, *sha); + assert_eq!(commit_data.subject.as_ref(), format!("Subject {index}")); + assert_eq!( + commit_data.message.as_ref(), + format!("Subject {index}\n\nBody {index}") + ); + } + + let primed_after = load_commit_data_batch(&repo_a, &commit_shas[2..], &executor, cx_a).await; + assert_eq!( + primed_after.len(), + 2, + "host should prime remaining commits after remote fetches" + ); + + let remote_batch_two = + load_commit_data_batch(&repo_b, &commit_shas[1..], &executor, cx_b).await; + assert_eq!(remote_batch_two.len(), 3); + + assert_remote_cache_matches_local_cache(&repo_a, &repo_b, cx_a, cx_b); +} + #[gpui::test] async fn test_linked_worktrees_sync( executor: BackgroundExecutor, diff --git a/crates/fs/src/fake_git_repo.rs b/crates/fs/src/fake_git_repo.rs index ca796c1d8376e6c6b03a63eb473723fb84ebfab1..f47a57ff3a9e3fcf4715df7ed353180d68ee0855 100644 --- a/crates/fs/src/fake_git_repo.rs +++ b/crates/fs/src/fake_git_repo.rs @@ -9,7 +9,7 @@ use git::{ Oid, RunHook, blame::Blame, repository::{ - AskPassDelegate, Branch, CommitDataReader, CommitDetails, CommitOptions, + AskPassDelegate, Branch, CommitData, CommitDataReader, CommitDetails, CommitOptions, CreateWorktreeTarget, FetchOptions, GRAPH_CHUNK_SIZE, GitRepository, GitRepositoryCheckpoint, InitialGraphCommitData, LogOrder, LogSource, PushOptions, RefEdit, Remote, RepoPath, ResetMode, SearchCommitArgs, Worktree, @@ -47,6 +47,12 @@ pub struct FakeCommitSnapshot { pub sha: String, } +#[derive(Debug, Clone)] +pub enum FakeCommitDataEntry { + Success(CommitData), + Fail(CommitData), +} + #[derive(Debug, Clone)] pub struct FakeGitRepositoryState { pub commit_history: Vec, @@ -67,6 +73,7 @@ pub struct FakeGitRepositoryState { pub simulated_graph_error: Option, pub refs: HashMap, pub graph_commits: Vec>, + pub commit_data: HashMap, pub stash_entries: GitStash, } @@ -88,6 +95,7 @@ impl FakeGitRepositoryState { oids: Default::default(), remotes: HashMap::default(), graph_commits: Vec::new(), + commit_data: Default::default(), commit_history: Vec::new(), stash_entries: Default::default(), } @@ -1452,7 +1460,24 @@ impl GitRepository for FakeGitRepository { } fn commit_data_reader(&self) -> Result { - anyhow::bail!("commit_data_reader not supported for FakeGitRepository") + let fs = self.fs.clone(); + let dot_git_path = self.dot_git_path.clone(); + let executor = self.executor.clone(); + Ok(CommitDataReader::for_test(executor, move |sha| { + fs.with_git_state(&dot_git_path, false, |state| { + let commit = state + .commit_data + .get(&sha) + .context(format!("graph commit data not found for {sha}"))?; + + match commit { + FakeCommitDataEntry::Success(data) => Ok(data.clone()), + FakeCommitDataEntry::Fail(_) => { + bail!("simulated commit data read failure for {sha}") + } + } + })? + })) } fn update_ref(&self, ref_name: String, commit: String) -> BoxFuture<'_, Result<()>> { diff --git a/crates/fs/src/fs.rs b/crates/fs/src/fs.rs index fa42c436f1b9beb4a17a63a61eeeb02b7e889569..6694b19e37385982e80b9349d1a3244270a1cdd5 100644 --- a/crates/fs/src/fs.rs +++ b/crates/fs/src/fs.rs @@ -53,10 +53,10 @@ mod fake_git_repo; #[cfg(feature = "test-support")] use collections::{BTreeMap, btree_map}; #[cfg(feature = "test-support")] -use fake_git_repo::FakeGitRepositoryState; +use fake_git_repo::{FakeCommitDataEntry, FakeGitRepositoryState}; #[cfg(feature = "test-support")] use git::{ - repository::{InitialGraphCommitData, RepoPath, Worktree, repo_path}, + repository::{CommitData, InitialGraphCommitData, RepoPath, Worktree, repo_path}, status::{FileStatus, StatusCode, TrackedStatus, UnmergedStatus}, }; #[cfg(feature = "test-support")] @@ -2212,6 +2212,29 @@ impl FakeFs { .unwrap(); } + pub fn set_commit_data( + &self, + dot_git: &Path, + commit_data: impl IntoIterator, + ) { + self.with_git_state(dot_git, true, |state| { + state.commit_data = commit_data + .into_iter() + .map(|(data, should_fail)| { + ( + data.sha, + if should_fail { + FakeCommitDataEntry::Fail(data) + } else { + FakeCommitDataEntry::Success(data) + }, + ) + }) + .collect(); + }) + .unwrap(); + } + /// Put the given git repository into a state with the given status, /// by mutating the head, index, and unmerged state. pub fn set_status_for_repo(&self, dot_git: &Path, statuses: &[(&str, FileStatus)]) { diff --git a/crates/git/src/repository.rs b/crates/git/src/repository.rs index 3056d91007694e35303f250e7d83d130fd2e9a38..e914460fe6a082e012cea7b4038636a4c2776ea4 100644 --- a/crates/git/src/repository.rs +++ b/crates/git/src/repository.rs @@ -99,7 +99,7 @@ pub fn original_repo_path_from_common_dir(common_dir: &Path) -> Option /// Commit data needed for the git graph visualization. #[derive(Debug, Clone)] -pub struct GraphCommitData { +pub struct CommitData { pub sha: Oid, /// Most commits have a single parent, so we use a SmallVec to avoid allocations. pub parents: SmallVec<[Oid; 1]>, @@ -107,6 +107,7 @@ pub struct GraphCommitData { pub author_email: SharedString, pub commit_timestamp: i64, pub subject: SharedString, + pub message: SharedString, } #[derive(Debug)] @@ -118,7 +119,7 @@ pub struct InitialGraphCommitData { struct CommitDataRequest { sha: Oid, - response_tx: oneshot::Sender>, + response_tx: oneshot::Sender>, } pub struct CommitDataReader { @@ -127,7 +128,7 @@ pub struct CommitDataReader { } impl CommitDataReader { - pub async fn read(&self, sha: Oid) -> Result { + pub async fn read(&self, sha: Oid) -> Result { let (response_tx, response_rx) = oneshot::channel(); self.request_tx .send(CommitDataRequest { sha, response_tx }) @@ -137,15 +138,37 @@ impl CommitDataReader { .await .map_err(|_| anyhow!("commit data reader task dropped response"))? } + + #[cfg(any(test, feature = "test-support"))] + pub fn for_test( + executor: BackgroundExecutor, + resolve: impl 'static + Send + Sync + Fn(Oid) -> Result, + ) -> Self { + let (request_tx, request_rx) = smol::channel::bounded::(64); + let resolve = Arc::new(resolve); + let delay_executor = executor.clone(); + let task = executor.spawn(async move { + while let Ok(CommitDataRequest { sha, response_tx }) = request_rx.recv().await { + delay_executor.simulate_random_delay().await; + response_tx.send(resolve(sha)).ok(); + } + }); + + Self { + request_tx, + _task: task, + } + } } -fn parse_cat_file_commit(sha: Oid, content: &str) -> Option { +fn parse_cat_file_commit(sha: Oid, content: &str) -> Option { let mut parents = SmallVec::new(); let mut author_name = SharedString::default(); let mut author_email = SharedString::default(); let mut commit_timestamp = 0i64; let mut in_headers = true; let mut subject = None; + let mut message_lines = Vec::new(); for line in content.lines() { if in_headers { @@ -172,18 +195,22 @@ fn parse_cat_file_commit(sha: Oid, content: &str) -> Option { } } } - } else if subject.is_none() { - subject = Some(SharedString::from(line.to_string())); + } else { + if subject.is_none() { + subject = Some(SharedString::from(line.to_string())); + } + message_lines.push(line); } } - Some(GraphCommitData { + Some(CommitData { sha, parents, author_name, author_email, commit_timestamp, subject: subject.unwrap_or_default(), + message: SharedString::from(message_lines.join("\n")), }) } @@ -3210,7 +3237,7 @@ async fn run_commit_data_reader( async fn read_single_commit_response( stdout: &mut R, sha: &Oid, -) -> Result { +) -> Result { let mut header_bytes = Vec::new(); stdout.read_until(b'\n', &mut header_bytes).await?; let header_line = String::from_utf8_lossy(&header_bytes); diff --git a/crates/git_graph/src/git_graph.rs b/crates/git_graph/src/git_graph.rs index e0175db09f1ef95fbd62d06ef71d20cfa63e4444..2783415bd7cafc996ca3ec069d579466183372a4 100644 --- a/crates/git_graph/src/git_graph.rs +++ b/crates/git_graph/src/git_graph.rs @@ -1251,7 +1251,7 @@ impl GitGraph { .min(self.graph_data.commits.len().saturating_sub(1))] .iter() .for_each(|commit| { - repository.fetch_commit_data(commit.data.sha, cx); + repository.fetch_commit_data(commit.data.sha, false, cx); }); }); } @@ -1270,7 +1270,9 @@ impl GitGraph { }; let data = repository.update(cx, |repository, cx| { - repository.fetch_commit_data(commit.data.sha, cx).clone() + repository + .fetch_commit_data(commit.data.sha, false, cx) + .clone() }); let short_sha = commit.data.sha.display_short(); @@ -1817,7 +1819,7 @@ impl GitGraph { let data = repository.update(cx, |repository, cx| { repository - .fetch_commit_data(commit_entry.data.sha, cx) + .fetch_commit_data(commit_entry.data.sha, false, cx) .clone() }); @@ -1846,7 +1848,7 @@ impl GitGraph { Some(data.commit_timestamp), data.subject.clone(), ), - CommitDataState::Loading => ("Loading…".into(), "".into(), None, "Loading…".into()), + CommitDataState::Loading(_) => ("Loading…".into(), "".into(), None, "Loading…".into()), }; let date_string = commit_timestamp diff --git a/crates/project/src/git_store.rs b/crates/project/src/git_store.rs index 0d4051be7e52de206e1a719a25c1a804b756ff40..99e5ad16ab2682fe3db7cb832643b990cbd62c23 100644 --- a/crates/project/src/git_store.rs +++ b/crates/project/src/git_store.rs @@ -26,16 +26,16 @@ use futures::{ oneshot::{self, Canceled}, }, future::{self, BoxFuture, Shared}, - stream::FuturesOrdered, + stream::{FuturesOrdered, FuturesUnordered}, }; use git::{ BuildPermalinkParams, GitHostingProviderRegistry, Oid, RunHook, blame::Blame, parse_git_remote_url, repository::{ - Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, CreateWorktreeTarget, - DiffType, FetchOptions, GitCommitTemplate, GitRepository, GitRepositoryCheckpoint, - GraphCommitData, InitialGraphCommitData, LogOrder, LogSource, PushOptions, Remote, + Branch, CommitData, CommitDetails, CommitDiff, CommitFile, CommitOptions, + CreateWorktreeTarget, DiffType, FetchOptions, GitCommitTemplate, GitRepository, + GitRepositoryCheckpoint, InitialGraphCommitData, LogOrder, LogSource, PushOptions, Remote, RemoteCommandOutput, RepoPath, ResetMode, SearchCommitArgs, UpstreamTrackingStatus, Worktree as GitWorktree, }, @@ -46,8 +46,8 @@ use git::{ }, }; use gpui::{ - App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task, - WeakEntity, + App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString, + Subscription, Task, WeakEntity, }; use language::{ Buffer, BufferEvent, Language, LanguageRegistry, @@ -62,6 +62,7 @@ use rpc::{ }; use serde::Deserialize; use settings::{Settings, WorktreeId}; +use smallvec::SmallVec; use smol::future::yield_now; use std::{ cmp::Ordering, @@ -75,7 +76,7 @@ use std::{ Arc, atomic::{self, AtomicU64}, }, - time::Instant, + time::{Duration, Instant}, }; use sum_tree::{Edit, SumTree, TreeMap}; use task::Shell; @@ -273,8 +274,8 @@ pub struct MergeDetails { #[derive(Clone)] pub enum CommitDataState { - Loading, - Loaded(Arc), + Loading(Option>>>), + Loaded(Arc), } #[derive(Clone, Debug, PartialEq, Eq)] @@ -307,14 +308,26 @@ pub struct JobInfo { pub message: SharedString, } -struct GraphCommitDataHandler { +struct CommitDataHandler { _task: Task<()>, commit_data_request: smol::channel::Sender, + completion_senders: HashMap>>, + pending_requests: HashSet, } -enum GraphCommitHandlerState { - Starting, - Open(GraphCommitDataHandler), +/// Represents the handler of a git cat-file --batch process within Zed +/// It's used to lazily fetch commit data as needed (whatever a user is viewing) +enum CommitDataHandlerState { + /// The handler is open and processing requests + Open(CommitDataHandler), + /// The handler closed because it didn't receive any requests in the last 10s + /// or hasn't been open before + Closed, +} + +enum NextCommitDataRequest { + Request(BoxFuture<'static, Result>), + Idle, Closed, } @@ -347,7 +360,7 @@ pub struct Repository { latest_askpass_id: u64, repository_state: Shared>>, initial_graph_data: HashMap<(LogSource, LogOrder), InitialGitGraphData>, - graph_commit_data_handler: GraphCommitHandlerState, + commit_data_handler: CommitDataHandlerState, commit_data: HashMap, } @@ -595,6 +608,7 @@ impl GitStore { client.add_entity_request_handler(Self::handle_get_head_sha); client.add_entity_request_handler(Self::handle_edit_ref); client.add_entity_request_handler(Self::handle_repair_worktrees); + client.add_entity_request_handler(Self::handle_get_commit_data); } pub fn is_local(&self) -> bool { @@ -2525,6 +2539,55 @@ impl GitStore { Ok(proto::GitGetHeadShaResponse { sha: head_sha }) } + async fn handle_get_commit_data( + this: Entity, + envelope: TypedEnvelope, + mut cx: AsyncApp, + ) -> Result { + let repository_id = RepositoryId::from_proto(envelope.payload.repository_id); + let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?; + + let shas: Vec = envelope + .payload + .shas + .iter() + .filter_map(|s| Oid::from_str(s).ok()) + .collect(); + + let mut commits = Vec::with_capacity(shas.len()); + let mut receivers = Vec::new(); + + repository_handle.update(&mut cx, |repository, cx| { + for &sha in &shas { + match repository.fetch_commit_data(sha, true, cx) { + CommitDataState::Loaded(data) => { + commits.push(commit_data_to_proto(data)); + } + CommitDataState::Loading(Some(shared)) => { + receivers.push(shared.clone()); + } + CommitDataState::Loading(None) => { + // todo(git_graph) this could happen if the request fails, we should encode an error case + debug_panic!( + "This should never happen since we passed true into fetch commit data" + ); + } + } + } + }); + + let results = future::join_all(receivers).await; + + commits.extend( + results + .into_iter() + .filter_map(|result| result.ok()) + .map(|data| commit_data_to_proto(&data)), + ); + + Ok(proto::GetCommitDataResponse { commits }) + } + async fn handle_edit_ref( this: Entity, envelope: TypedEnvelope, @@ -4236,7 +4299,7 @@ impl Repository { active_jobs: Default::default(), initial_graph_data: Default::default(), commit_data: Default::default(), - graph_commit_data_handler: GraphCommitHandlerState::Closed, + commit_data_handler: CommitDataHandlerState::Closed, } } @@ -4274,7 +4337,7 @@ impl Repository { job_id: 0, initial_graph_data: Default::default(), commit_data: Default::default(), - graph_commit_data_handler: GraphCommitHandlerState::Closed, + commit_data_handler: CommitDataHandlerState::Closed, } } @@ -5000,40 +5063,95 @@ impl Repository { Ok(()) } - pub fn fetch_commit_data(&mut self, sha: Oid, cx: &mut Context) -> &CommitDataState { - if !self.commit_data.contains_key(&sha) { - match &self.graph_commit_data_handler { - GraphCommitHandlerState::Open(handler) => { - if handler.commit_data_request.try_send(sha).is_ok() { - let old_value = self.commit_data.insert(sha, CommitDataState::Loading); - debug_assert!(old_value.is_none(), "We should never overwrite commit data"); - } - } - GraphCommitHandlerState::Closed => { - self.open_graph_commit_data_handler(cx); - } - GraphCommitHandlerState::Starting => {} + pub fn fetch_commit_data( + &mut self, + sha: Oid, + await_result: bool, + cx: &mut Context, + ) -> &CommitDataState { + if self.commit_data.contains_key(&sha) { + let data = &self.commit_data[&sha]; + + if let CommitDataState::Loading(None) = data + && await_result + { + let (tx, rx) = oneshot::channel(); + self.commit_data + .insert(sha, CommitDataState::Loading(Some(rx.shared()))); + + let handler = self.get_handler(cx); + handler.completion_senders.insert(sha, tx); } + + return &self.commit_data[&sha]; } - self.commit_data - .get(&sha) - .unwrap_or(&CommitDataState::Loading) + let (state, completer) = if await_result { + let (tx, rx) = oneshot::channel(); + (CommitDataState::Loading(Some(rx.shared())), Some(tx)) + } else { + (CommitDataState::Loading(None), None) + }; + + self.commit_data.insert(sha, state); + + let handler = self.get_handler(cx); + if let Some(tx) = completer { + handler.completion_senders.insert(sha, tx); + } + let mut has_failed = false; + if handler.commit_data_request.try_send(sha).is_ok() { + handler.pending_requests.insert(sha); + } else { + has_failed = true; + handler.completion_senders.remove(&sha); + debug_assert!( + matches!( + self.commit_data.remove(&sha), + Some(CommitDataState::Loading(_)) + ), + "Commit data should still be loading when enqueueing the request fails" + ); + } + + &self.commit_data.get(&sha).unwrap_or_else(|| { + debug_assert!(!has_failed, "This should always be inserted"); + &CommitDataState::Loading(None) + }) } - fn open_graph_commit_data_handler(&mut self, cx: &mut Context) { - self.graph_commit_data_handler = GraphCommitHandlerState::Starting; + fn get_handler(&mut self, cx: &mut Context) -> &mut CommitDataHandler { + if matches!(self.commit_data_handler, CommitDataHandlerState::Closed) { + self.commit_data_handler = + CommitDataHandlerState::Open(self.open_commit_data_handler(cx)); + } + match &mut self.commit_data_handler { + CommitDataHandlerState::Open(handler) => handler, + CommitDataHandlerState::Closed => unreachable!(), + } + } + + fn open_commit_data_handler(&self, cx: &Context) -> CommitDataHandler { let state = self.repository_state.clone(); - let (result_tx, result_rx) = smol::channel::bounded::<(Oid, GraphCommitData)>(64); + let (result_tx, result_rx) = smol::channel::bounded::<(Oid, CommitData)>(64); let (request_tx, request_rx) = smol::channel::unbounded::(); let foreground_task = cx.spawn(async move |this, cx| { while let Ok((sha, commit_data)) = result_rx.recv().await { let result = this.update(cx, |this, cx| { - let old_value = this - .commit_data - .insert(sha, CommitDataState::Loaded(Arc::new(commit_data))); + let data = Arc::new(commit_data); + + if let CommitDataHandlerState::Open(handler) = &mut this.commit_data_handler { + handler.pending_requests.remove(&sha); + if let Some(completion_sender) = handler.completion_senders.remove(&sha) { + completion_sender.send(data.clone()).ok(); + } + } else { + debug_panic!("The handler state has to be open for this task to exist"); + } + + let old_value = this.commit_data.insert(sha, CommitDataState::Loaded(data)); debug_assert!( !matches!(old_value, Some(CommitDataState::Loaded(_))), "We should never overwrite commit data" @@ -5047,69 +5165,250 @@ impl Repository { } this.update(cx, |this, _cx| { - this.graph_commit_data_handler = GraphCommitHandlerState::Closed; + let CommitDataHandlerState::Open(handler) = std::mem::replace( + &mut this.commit_data_handler, + CommitDataHandlerState::Closed, + ) else { + debug_panic!("The handler state has to be open for this task to exist"); + return; + }; + + for sha in handler.pending_requests { + this.commit_data.remove(&sha); + } }) .ok(); }); let request_tx_for_handler = request_tx; + let repository_id = self.id; let background_executor = cx.background_executor().clone(); cx.background_spawn(async move { - let backend = match state.await { - Ok(RepositoryState::Local(LocalRepositoryState { backend, .. })) => backend, - Ok(RepositoryState::Remote(_)) => { - log::error!("commit_data_reader not supported for remote repositories"); - return; + match state.await { + Ok(RepositoryState::Local(LocalRepositoryState { backend, .. })) => { + Self::local_commit_data_reader( + backend, + request_rx, + result_tx, + background_executor, + ) + .await; + } + Ok(RepositoryState::Remote(RemoteRepositoryState { project_id, client })) => { + Self::remote_commit_data_reader( + project_id, + client, + repository_id, + request_rx, + result_tx, + background_executor, + ) + .await; } Err(error) => { log::error!("failed to get repository state: {error}"); return; } }; + }) + .detach(); - let reader = match backend.commit_data_reader() { - Ok(reader) => reader, - Err(error) => { - log::error!("failed to create commit data reader: {error:?}"); - return; - } - }; + CommitDataHandler { + _task: foreground_task, + commit_data_request: request_tx_for_handler, + completion_senders: HashMap::default(), + pending_requests: HashSet::default(), + } + } - loop { - let timeout = background_executor.timer(std::time::Duration::from_secs(10)); + async fn local_commit_data_reader( + backend: Arc, + request_rx: smol::channel::Receiver, + result_tx: smol::channel::Sender<(Oid, CommitData)>, + background_executor: BackgroundExecutor, + ) { + let reader = match backend.commit_data_reader() { + Ok(reader) => reader, + Err(error) => { + log::error!("failed to create commit data reader: {error:?}"); + return; + } + }; - futures::select_biased! { - sha = futures::FutureExt::fuse(request_rx.recv()) => { - let Ok(sha) = sha else { - break; - }; + loop { + let timeout = background_executor.timer(std::time::Duration::from_secs(10)); - match reader.read(sha).await { - Ok(commit_data) => { - if result_tx.send((sha, commit_data)).await.is_err() { - break; - } - } - Err(error) => { - log::error!("failed to read commit data for {sha}: {error:?}"); + futures::select_biased! { + sha = futures::FutureExt::fuse(request_rx.recv()) => { + let Ok(sha) = sha else { + break; + }; + + match reader.read(sha).await { + Ok(commit_data) => { + if result_tx.send((sha, commit_data)).await.is_err() { + break; } } + Err(error) => { + log::error!("failed to read commit data for {sha}: {error:?}"); + } } - _ = futures::FutureExt::fuse(timeout) => { - break; + } + _ = futures::FutureExt::fuse(timeout) => { + break; + } + } + } + + drop(result_tx); + } + + async fn remote_commit_data_reader( + project_id: ProjectId, + client: AnyProtoClient, + repository_id: RepositoryId, + request_rx: smol::channel::Receiver, + result_tx: smol::channel::Sender<(Oid, CommitData)>, + background_executor: BackgroundExecutor, + ) { + let mut response_futures = + FuturesUnordered::>>::new(); + let mut accept_requests = true; + let mut next_request = Self::get_next_request( + project_id, + client.clone(), + repository_id, + &request_rx, + &background_executor, + ) + .boxed() + .fuse(); + + loop { + if !accept_requests && response_futures.is_empty() { + break; + } + + if response_futures.is_empty() { + match (&mut next_request).await { + NextCommitDataRequest::Request(request) => { + response_futures.push(request); + next_request = Self::get_next_request( + project_id, + client.clone(), + repository_id, + &request_rx, + &background_executor, + ) + .boxed() + .fuse(); } + NextCommitDataRequest::Closed | NextCommitDataRequest::Idle => break, } } - drop(result_tx); - }) - .detach(); + let next_response = response_futures.next().fuse(); + futures::pin_mut!(next_response); - self.graph_commit_data_handler = GraphCommitHandlerState::Open(GraphCommitDataHandler { - _task: foreground_task, - commit_data_request: request_tx_for_handler, - }); + futures::select_biased! { + request = next_request => { + match request { + NextCommitDataRequest::Request(request) => { + response_futures.push(request); + } + NextCommitDataRequest::Idle => {} + NextCommitDataRequest::Closed => { + accept_requests = false; + } + } + + if accept_requests { + next_request = Self::get_next_request( + project_id, + client.clone(), + repository_id, + &request_rx, + &background_executor, + ) + .boxed() + .fuse(); + } + } + result = next_response => { + let Some(result) = result else { + continue; + }; + + if let Ok(commit_data) = result { + for commit in commit_data.commits { + let Ok(commit_data) = commit_data_from_proto(commit) else { + continue; + }; + + if result_tx + .send((commit_data.sha, commit_data)) + .await + .is_err() + { + return; + } + } + } + } + } + } + + drop(result_tx); + } + + async fn get_next_request( + project_id: ProjectId, + client: AnyProtoClient, + repository_id: RepositoryId, + request_rx: &smol::channel::Receiver, + background_executor: &BackgroundExecutor, + ) -> NextCommitDataRequest { + let mut queued_shas = Vec::with_capacity(64); + + loop { + if queued_shas.len() >= 64 { + break; + } + + let timeout = background_executor.timer(Duration::from_millis(5)); + + futures::select_biased! { + sha = futures::FutureExt::fuse(request_rx.recv()) => { + let Ok(sha) = sha else { + break; + }; + + queued_shas.push(sha); + + } + _ = futures::FutureExt::fuse(timeout) => { + break; + } + } + } + + if queued_shas.is_empty() && request_rx.is_closed() { + NextCommitDataRequest::Closed + } else if queued_shas.is_empty() { + NextCommitDataRequest::Idle + } else { + NextCommitDataRequest::Request( + client + .request(proto::GetCommitData { + project_id: project_id.to_proto(), + repository_id: repository_id.to_proto(), + shas: queued_shas.into_iter().map(|oid| oid.to_string()).collect(), + }) + .boxed(), + ) + } } fn buffer_store(&self, cx: &App) -> Option> { @@ -7625,6 +7924,35 @@ fn deserialize_blame_buffer_response( Some(Blame { entries, messages }) } +fn commit_data_to_proto(commit: &CommitData) -> proto::CommitData { + proto::CommitData { + sha: commit.sha.to_string(), + parents: commit.parents.iter().map(|p| p.to_string()).collect(), + author_name: commit.author_name.to_string(), + author_email: commit.author_email.to_string(), + commit_timestamp: commit.commit_timestamp, + subject: commit.subject.to_string(), + message: commit.message.to_string(), + } +} + +fn commit_data_from_proto(commit: proto::CommitData) -> Result { + let sha = Oid::from_str(&commit.sha)?; + let mut parents = SmallVec::with_capacity(commit.parents.len()); + for parent in &commit.parents { + parents.push(Oid::from_str(parent)?); + } + Ok(CommitData { + sha, + parents, + author_name: SharedString::from(commit.author_name), + author_email: SharedString::from(commit.author_email), + commit_timestamp: commit.commit_timestamp, + subject: SharedString::from(commit.subject), + message: SharedString::from(commit.message), + }) +} + fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch { proto::Branch { is_head: branch.is_head, @@ -7735,6 +8063,354 @@ fn proto_to_commit_details(proto: &proto::GitCommitDetails) -> CommitDetails { } } +#[cfg(any(test, feature = "test-support"))] +impl Repository { + pub fn loaded_commit_data_for_test(&self) -> HashMap { + self.commit_data + .iter() + .filter_map(|(sha, state)| match state { + CommitDataState::Loaded(data) => Some((*sha, data.as_ref().clone())), + CommitDataState::Loading(_) => None, + }) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::Project; + use fs::FakeFs; + use gpui::TestAppContext; + use gpui::proptest::prelude::*; + use rand::{SeedableRng, rngs::StdRng}; + use serde_json::json; + use settings::SettingsStore; + use std::path::Path; + + fn init_test(cx: &mut TestAppContext) { + cx.update(|cx| { + let settings_store = SettingsStore::test(cx); + cx.set_global(settings_store); + }); + } + + fn verify_invariants(repository: &Repository) -> anyhow::Result<()> { + match &repository.commit_data_handler { + CommitDataHandlerState::Open(handler) => { + verify_loading_entries_are_pending(repository, handler)?; + verify_await_result_loading_entries_have_completion_senders(repository, handler)?; + verify_pending_requests_are_loading(repository, handler)?; + verify_completion_senders_are_await_result_loading(repository, handler)?; + verify_completion_senders_are_pending(handler)?; + verify_non_await_result_loading_entries_have_no_completion_sender( + repository, handler, + )?; + verify_loaded_entries_are_not_pending(repository, handler)?; + verify_loaded_entries_have_no_completion_sender(repository, handler)?; + } + CommitDataHandlerState::Closed => { + verify_closed_handler_invariants(repository)?; + } + } + + Ok(()) + } + + fn verify_loading_entries_are_pending( + repository: &Repository, + handler: &CommitDataHandler, + ) -> anyhow::Result<()> { + for (sha, state) in &repository.commit_data { + if matches!(state, CommitDataState::Loading(_)) { + anyhow::ensure!( + handler.pending_requests.contains(sha), + "loading commit data for {sha} must be tracked in pending_requests" + ); + } + } + + Ok(()) + } + + fn verify_await_result_loading_entries_have_completion_senders( + repository: &Repository, + handler: &CommitDataHandler, + ) -> anyhow::Result<()> { + for (sha, state) in &repository.commit_data { + if matches!(state, CommitDataState::Loading(Some(_))) { + anyhow::ensure!( + handler.completion_senders.contains_key(sha), + "await-result loading commit data for {sha} must have a completion sender" + ); + } + } + + Ok(()) + } + + fn verify_pending_requests_are_loading( + repository: &Repository, + handler: &CommitDataHandler, + ) -> anyhow::Result<()> { + for sha in &handler.pending_requests { + anyhow::ensure!( + matches!( + repository.commit_data.get(sha), + Some(CommitDataState::Loading(_)) + ), + "pending request for {sha} must correspond to loading commit data" + ); + } + + Ok(()) + } + + fn verify_completion_senders_are_await_result_loading( + repository: &Repository, + handler: &CommitDataHandler, + ) -> anyhow::Result<()> { + for sha in handler.completion_senders.keys() { + anyhow::ensure!( + matches!( + repository.commit_data.get(sha), + Some(CommitDataState::Loading(Some(_))) + ), + "completion sender for {sha} must correspond to await-result loading commit data" + ); + } + + Ok(()) + } + + fn verify_completion_senders_are_pending(handler: &CommitDataHandler) -> anyhow::Result<()> { + for sha in handler.completion_senders.keys() { + anyhow::ensure!( + handler.pending_requests.contains(sha), + "completion sender for {sha} must also be tracked as pending" + ); + } + + Ok(()) + } + + fn verify_non_await_result_loading_entries_have_no_completion_sender( + repository: &Repository, + handler: &CommitDataHandler, + ) -> anyhow::Result<()> { + for (sha, state) in &repository.commit_data { + if matches!(state, CommitDataState::Loading(None)) { + anyhow::ensure!( + !handler.completion_senders.contains_key(sha), + "non-await-result loading commit data for {sha} must not have a completion sender" + ); + } + } + + Ok(()) + } + + fn verify_loaded_entries_are_not_pending( + repository: &Repository, + handler: &CommitDataHandler, + ) -> anyhow::Result<()> { + for (sha, state) in &repository.commit_data { + if matches!(state, CommitDataState::Loaded(_)) { + anyhow::ensure!( + !handler.pending_requests.contains(sha), + "loaded commit data for {sha} must not still be pending" + ); + } + } + + Ok(()) + } + + fn verify_loaded_entries_have_no_completion_sender( + repository: &Repository, + handler: &CommitDataHandler, + ) -> anyhow::Result<()> { + for (sha, state) in &repository.commit_data { + if matches!(state, CommitDataState::Loaded(_)) { + anyhow::ensure!( + !handler.completion_senders.contains_key(sha), + "loaded commit data for {sha} must not keep a completion sender" + ); + } + } + + Ok(()) + } + + fn verify_closed_handler_invariants(repository: &Repository) -> anyhow::Result<()> { + for (sha, state) in &repository.commit_data { + anyhow::ensure!( + !matches!(state, CommitDataState::Loading(_)), + "closed handler must not keep loading commit data for {sha}" + ); + } + + Ok(()) + } + + #[gpui::property_test(config = ProptestConfig { + cases: 20, + ..Default::default() + })] + async fn test_commit_data_random_invariants( + #[strategy = any::()] seed: u64, + #[strategy = gpui::proptest::collection::vec(0usize..2000, 1..200)] commit_indexes: Vec< + usize, + >, + #[strategy = gpui::proptest::collection::vec(any::(), 1..200)] await_results: Vec< + bool, + >, + #[strategy = gpui::proptest::collection::vec(0usize..2000, 0..200)] failing_commit_indexes: Vec< + usize, + >, + #[strategy = gpui::proptest::collection::vec(0usize..2000, 0..200)] missing_commit_indexes: Vec< + usize, + >, + cx: &mut TestAppContext, + ) { + init_test(cx); + let mut rng = StdRng::seed_from_u64(seed); + + let commit_shas = (0..2000).map(|_| Oid::random(&mut rng)).collect::>(); + let failing_shas = failing_commit_indexes + .into_iter() + .map(|index| commit_shas[index % commit_shas.len()]) + .collect::>(); + let missing_shas = missing_commit_indexes + .into_iter() + .map(|index| commit_shas[index % commit_shas.len()]) + .collect::>(); + let commit_data = commit_shas + .iter() + .filter(|sha| !missing_shas.contains(sha)) + .map(|sha| { + ( + CommitData { + sha: *sha, + parents: SmallVec::new(), + author_name: SharedString::from(format!("Author {sha}")), + author_email: SharedString::from(format!("{sha}@example.com")), + commit_timestamp: rng.random_range(0..10_000), + subject: SharedString::from(format!("Subject {sha}")), + message: SharedString::from(format!("Subject {sha}\n\nBody for {sha}")), + }, + failing_shas.contains(sha), + ) + }) + .collect::>(); + let expected_loaded_shas = commit_indexes + .iter() + .map(|index| commit_shas[index % commit_shas.len()]) + .filter(|sha| !failing_shas.contains(sha) && !missing_shas.contains(sha)) + .collect::>(); + + let fs = FakeFs::new(cx.executor()); + fs.insert_tree( + Path::new("/project"), + json!({ + ".git": {}, + "file.txt": "content", + }), + ) + .await; + fs.set_commit_data(Path::new("/project/.git"), commit_data); + + let project = Project::test(fs.clone(), [Path::new("/project")], cx).await; + project + .update(cx, |project, cx| project.git_scans_complete(cx)) + .await; + + let repository = project.read_with(cx, |project, cx| { + project + .active_repository(cx) + .expect("should have a repository") + }); + + cx.update(|cx| { + cx.observe(&repository, |repo, cx| { + verify_invariants(repo.read(cx)) + .context("Invariant weren't held after a cx.notify") + .unwrap(); + }) + }) + .detach(); + + let mut next_step = 0; + while next_step < commit_indexes.len() { + let remaining_steps = commit_indexes.len() - next_step; + let chunk_size = rng.random_range(1..=remaining_steps.min(16)); + let chunk_end = next_step + chunk_size; + + for step in next_step..chunk_end { + let sha = commit_shas[commit_indexes[step] % commit_shas.len()]; + let await_result = await_results[step % await_results.len()]; + + repository.update(cx, |repository, cx| { + repository.fetch_commit_data(sha, await_result, cx); + verify_invariants(repository) + .with_context(|| { + format!( + "commit data invariant violation after step {} for sha {}", + step + 1, + sha, + ) + }) + .unwrap(); + }); + } + + cx.run_until_parked(); + repository.read_with(cx, |repository, _cx| { + verify_invariants(repository) + .with_context(|| { + format!( + "commit data invariant violation after draining through step {}", + chunk_end, + ) + }) + .unwrap(); + }); + + next_step = chunk_end; + } + + cx.run_until_parked(); + repository.read_with(cx, |repository, _cx| { + verify_invariants(repository) + .with_context(|| "commit data invariant violation after final drain".to_string()) + .unwrap(); + + let loaded_shas = repository + .commit_data + .iter() + .filter_map(|(sha, state)| match state { + CommitDataState::Loaded(_) => Some(*sha), + CommitDataState::Loading(_) => None, + }) + .collect::>(); + let missing_loaded_shas = expected_loaded_shas + .difference(&loaded_shas) + .copied() + .collect::>(); + let unexpected_loaded_shas = loaded_shas + .difference(&expected_loaded_shas) + .copied() + .collect::>(); + assert!( + missing_loaded_shas.is_empty() && unexpected_loaded_shas.is_empty(), + "loaded commit data SHAs after final drain did not match expectation. missing: {:?}, unexpected: {:?}", + missing_loaded_shas, + unexpected_loaded_shas, + ); + }); + } +} + /// This snapshot computes the repository state on the foreground thread while /// running the git commands on the background thread. We update branch, head, /// remotes, and worktrees first so the UI can react sooner, then compute file diff --git a/crates/proto/proto/git.proto b/crates/proto/proto/git.proto index 78f3fb2aea9dec9f18a2086c0d7599f729dae174..bb851ddbd3b3b7fc03a151f53d43e651dbc59700 100644 --- a/crates/proto/proto/git.proto +++ b/crates/proto/proto/git.proto @@ -692,3 +692,23 @@ message RunGitHook { uint64 repository_id = 2; GitHook hook = 3; } + +message GetCommitData { + uint64 project_id = 1; + uint64 repository_id = 2; + repeated string shas = 3; +} + +message CommitData { + string sha = 1; + repeated string parents = 2; + string author_name = 3; + string author_email = 4; + int64 commit_timestamp = 5; + string subject = 6; + string message = 7; +} + +message GetCommitDataResponse { + repeated CommitData commits = 1; +} diff --git a/crates/proto/proto/zed.proto b/crates/proto/proto/zed.proto index 1da7b96892263385f13df19f6208898cfe090266..568c29435066c8e5ff21f67b316c7a70e4a43da5 100644 --- a/crates/proto/proto/zed.proto +++ b/crates/proto/proto/zed.proto @@ -481,7 +481,9 @@ message Envelope { GitEditRef git_edit_ref = 443; GitCreateArchiveCheckpoint git_create_archive_checkpoint = 444; GitCreateArchiveCheckpointResponse git_create_archive_checkpoint_response = 445; - GitRestoreArchiveCheckpoint git_restore_archive_checkpoint = 446; // current max + GitRestoreArchiveCheckpoint git_restore_archive_checkpoint = 446; + GetCommitData get_commit_data = 447; + GetCommitDataResponse get_commit_data_response = 448; // current max } reserved 87 to 88; diff --git a/crates/proto/src/proto.rs b/crates/proto/src/proto.rs index 83a559cb28330601424d3d4f2d2efc6191b3ebb9..a4d382e40f3240bf6fd1522c886fdf5789d685e9 100644 --- a/crates/proto/src/proto.rs +++ b/crates/proto/src/proto.rs @@ -358,6 +358,8 @@ messages!( (GitGetHeadShaResponse, Background), (GitEditRef, Background), (GitRepairWorktrees, Background), + (GetCommitData, Background), + (GetCommitDataResponse, Background), (GitWorktreesResponse, Background), (GitCreateWorktree, Background), (GitRemoveWorktree, Background), @@ -573,6 +575,7 @@ request_messages!( (GitGetHeadSha, GitGetHeadShaResponse), (GitEditRef, Ack), (GitRepairWorktrees, Ack), + (GetCommitData, GetCommitDataResponse), (GitCreateWorktree, Ack), (GitRemoveWorktree, Ack), (GitRenameWorktree, Ack), @@ -767,6 +770,7 @@ entity_messages!( GitGetHeadSha, GitEditRef, GitRepairWorktrees, + GetCommitData, GitCreateArchiveCheckpoint, GitRestoreArchiveCheckpoint, GitCreateWorktree,