@@ -4,12 +4,16 @@ use call::ActiveCall;
use client::RECEIVE_TIMEOUT;
use collections::HashMap;
use git::{
- repository::{RepoPath, Worktree as GitWorktree},
+ Oid,
+ repository::{CommitData, RepoPath, Worktree as GitWorktree},
status::{DiffStat, FileStatus, StatusCode, TrackedStatus},
};
use git_ui::{git_panel::GitPanel, project_diff::ProjectDiff};
-use gpui::{AppContext as _, BackgroundExecutor, TestAppContext, VisualTestContext};
-use project::ProjectPath;
+use gpui::{AppContext as _, BackgroundExecutor, SharedString, TestAppContext, VisualTestContext};
+use project::{
+ ProjectPath,
+ git_store::{CommitDataState, Repository},
+};
use serde_json::json;
use util::{path, rel_path::rel_path};
@@ -91,6 +95,90 @@ fn collect_diff_stats<C: gpui::AppContext>(
})
}
+async fn load_commit_data_batch(
+ repository: &gpui::Entity<Repository>,
+ shas: &[Oid],
+ executor: &BackgroundExecutor,
+ cx: &mut TestAppContext,
+) -> HashMap<Oid, CommitData> {
+ let states = cx.update(|cx| {
+ shas.iter()
+ .map(|sha| {
+ (
+ *sha,
+ repository.update(cx, |repository, cx| {
+ repository.fetch_commit_data(*sha, true, cx).clone()
+ }),
+ )
+ })
+ .collect::<Vec<_>>()
+ });
+
+ executor.run_until_parked();
+
+ let mut commit_data = HashMap::default();
+ for (sha, state) in states {
+ let data = match state {
+ CommitDataState::Loaded(data) => data.as_ref().clone(),
+ CommitDataState::Loading(Some(shared)) => shared.await.unwrap().as_ref().clone(),
+ CommitDataState::Loading(None) => {
+ panic!("fetch_commit_data(..., true) should return an await-result state")
+ }
+ };
+ commit_data.insert(sha, data);
+ }
+
+ commit_data
+}
+
+fn assert_remote_cache_matches_local_cache(
+ local_repository: &gpui::Entity<Repository>,
+ remote_repository: &gpui::Entity<Repository>,
+ cx_local: &mut TestAppContext,
+ cx_remote: &mut TestAppContext,
+) {
+ let local_cache = cx_local.update(|cx| {
+ local_repository.update(cx, |repository, _| repository.loaded_commit_data_for_test())
+ });
+ let remote_cache = cx_remote.update(|cx| {
+ remote_repository.update(cx, |repository, _| repository.loaded_commit_data_for_test())
+ });
+
+ for (sha, remote_commit_data) in &remote_cache {
+ let local_commit_data = local_cache
+ .get(sha)
+ .unwrap_or_else(|| panic!("local cache missing commit data for {sha}"));
+ assert_eq!(
+ local_commit_data.sha, remote_commit_data.sha,
+ "local and remote cache should agree on sha for {sha}"
+ );
+ assert_eq!(
+ local_commit_data.parents, remote_commit_data.parents,
+ "local and remote cache should agree on parents for {sha}"
+ );
+ assert_eq!(
+ local_commit_data.author_name, remote_commit_data.author_name,
+ "local and remote cache should agree on author_name for {sha}"
+ );
+ assert_eq!(
+ local_commit_data.author_email, remote_commit_data.author_email,
+ "local and remote cache should agree on author_email for {sha}"
+ );
+ assert_eq!(
+ local_commit_data.commit_timestamp, remote_commit_data.commit_timestamp,
+ "local and remote cache should agree on commit_timestamp for {sha}"
+ );
+ assert_eq!(
+ local_commit_data.subject, remote_commit_data.subject,
+ "local and remote cache should agree on subject for {sha}"
+ );
+ assert_eq!(
+ local_commit_data.message, remote_commit_data.message,
+ "local and remote cache should agree on message for {sha}"
+ );
+ }
+}
+
#[gpui::test]
async fn test_project_diff(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
let mut server = TestServer::start(cx_a.background_executor.clone()).await;
@@ -480,6 +568,110 @@ async fn test_remote_git_head_sha(
assert_eq!(remote_head_sha.unwrap(), local_head_sha);
}
+#[gpui::test]
+async fn test_remote_git_commit_data_batches(
+ executor: BackgroundExecutor,
+ cx_a: &mut TestAppContext,
+ cx_b: &mut TestAppContext,
+) {
+ let mut server = TestServer::start(executor.clone()).await;
+ let client_a = server.create_client(cx_a, "user_a").await;
+ let client_b = server.create_client(cx_b, "user_b").await;
+ server
+ .create_room(&mut [(&client_a, cx_a), (&client_b, cx_b)])
+ .await;
+ let active_call_a = cx_a.read(ActiveCall::global);
+
+ client_a
+ .fs()
+ .insert_tree(
+ path!("/project"),
+ json!({ ".git": {}, "file.txt": "content" }),
+ )
+ .await;
+
+ let commit_shas = [
+ "0123456789abcdef0123456789abcdef01234567"
+ .parse::<Oid>()
+ .unwrap(),
+ "1111111111111111111111111111111111111111"
+ .parse::<Oid>()
+ .unwrap(),
+ "2222222222222222222222222222222222222222"
+ .parse::<Oid>()
+ .unwrap(),
+ "3333333333333333333333333333333333333333"
+ .parse::<Oid>()
+ .unwrap(),
+ ];
+
+ client_a.fs().set_commit_data(
+ Path::new(path!("/project/.git")),
+ commit_shas.iter().enumerate().map(|(index, sha)| {
+ (
+ CommitData {
+ sha: *sha,
+ parents: Default::default(),
+ author_name: SharedString::from(format!("Author {index}")),
+ author_email: SharedString::from(format!("author{index}@example.com")),
+ commit_timestamp: 1_700_000_000 + index as i64,
+ subject: SharedString::from(format!("Subject {index}")),
+ message: SharedString::from(format!("Subject {index}\n\nBody {index}")),
+ },
+ false,
+ )
+ }),
+ );
+
+ let (project_a, _) = client_a.build_local_project(path!("/project"), cx_a).await;
+ executor.run_until_parked();
+
+ let repo_a = cx_a.update(|cx| project_a.read(cx).active_repository(cx).unwrap());
+
+ let primed_before = load_commit_data_batch(&repo_a, &commit_shas[..2], &executor, cx_a).await;
+ assert_eq!(
+ primed_before.len(),
+ 2,
+ "host should prime two commits before sharing"
+ );
+
+ let project_id = active_call_a
+ .update(cx_a, |call, cx| call.share_project(project_a.clone(), cx))
+ .await
+ .unwrap();
+ let project_b = client_b.join_remote_project(project_id, cx_b).await;
+
+ executor.run_until_parked();
+
+ let repo_b = cx_b.update(|cx| project_b.read(cx).active_repository(cx).unwrap());
+
+ let remote_batch_one =
+ load_commit_data_batch(&repo_b, &commit_shas[..3], &executor, cx_b).await;
+ assert_eq!(remote_batch_one.len(), 3);
+ for (index, sha) in commit_shas[..3].iter().enumerate() {
+ let commit_data = remote_batch_one.get(sha).unwrap();
+ assert_eq!(commit_data.sha, *sha);
+ assert_eq!(commit_data.subject.as_ref(), format!("Subject {index}"));
+ assert_eq!(
+ commit_data.message.as_ref(),
+ format!("Subject {index}\n\nBody {index}")
+ );
+ }
+
+ let primed_after = load_commit_data_batch(&repo_a, &commit_shas[2..], &executor, cx_a).await;
+ assert_eq!(
+ primed_after.len(),
+ 2,
+ "host should prime remaining commits after remote fetches"
+ );
+
+ let remote_batch_two =
+ load_commit_data_batch(&repo_b, &commit_shas[1..], &executor, cx_b).await;
+ assert_eq!(remote_batch_two.len(), 3);
+
+ assert_remote_cache_matches_local_cache(&repo_a, &repo_b, cx_a, cx_b);
+}
+
#[gpui::test]
async fn test_linked_worktrees_sync(
executor: BackgroundExecutor,
@@ -26,16 +26,16 @@ use futures::{
oneshot::{self, Canceled},
},
future::{self, BoxFuture, Shared},
- stream::FuturesOrdered,
+ stream::{FuturesOrdered, FuturesUnordered},
};
use git::{
BuildPermalinkParams, GitHostingProviderRegistry, Oid, RunHook,
blame::Blame,
parse_git_remote_url,
repository::{
- Branch, CommitDetails, CommitDiff, CommitFile, CommitOptions, CreateWorktreeTarget,
- DiffType, FetchOptions, GitCommitTemplate, GitRepository, GitRepositoryCheckpoint,
- GraphCommitData, InitialGraphCommitData, LogOrder, LogSource, PushOptions, Remote,
+ Branch, CommitData, CommitDetails, CommitDiff, CommitFile, CommitOptions,
+ CreateWorktreeTarget, DiffType, FetchOptions, GitCommitTemplate, GitRepository,
+ GitRepositoryCheckpoint, InitialGraphCommitData, LogOrder, LogSource, PushOptions, Remote,
RemoteCommandOutput, RepoPath, ResetMode, SearchCommitArgs, UpstreamTrackingStatus,
Worktree as GitWorktree,
},
@@ -46,8 +46,8 @@ use git::{
},
};
use gpui::{
- App, AppContext, AsyncApp, Context, Entity, EventEmitter, SharedString, Subscription, Task,
- WeakEntity,
+ App, AppContext, AsyncApp, BackgroundExecutor, Context, Entity, EventEmitter, SharedString,
+ Subscription, Task, WeakEntity,
};
use language::{
Buffer, BufferEvent, Language, LanguageRegistry,
@@ -62,6 +62,7 @@ use rpc::{
};
use serde::Deserialize;
use settings::{Settings, WorktreeId};
+use smallvec::SmallVec;
use smol::future::yield_now;
use std::{
cmp::Ordering,
@@ -75,7 +76,7 @@ use std::{
Arc,
atomic::{self, AtomicU64},
},
- time::Instant,
+ time::{Duration, Instant},
};
use sum_tree::{Edit, SumTree, TreeMap};
use task::Shell;
@@ -273,8 +274,8 @@ pub struct MergeDetails {
#[derive(Clone)]
pub enum CommitDataState {
- Loading,
- Loaded(Arc<GraphCommitData>),
+ Loading(Option<Shared<oneshot::Receiver<Arc<CommitData>>>>),
+ Loaded(Arc<CommitData>),
}
#[derive(Clone, Debug, PartialEq, Eq)]
@@ -307,14 +308,26 @@ pub struct JobInfo {
pub message: SharedString,
}
-struct GraphCommitDataHandler {
+struct CommitDataHandler {
_task: Task<()>,
commit_data_request: smol::channel::Sender<Oid>,
+ completion_senders: HashMap<Oid, oneshot::Sender<Arc<CommitData>>>,
+ pending_requests: HashSet<Oid>,
}
-enum GraphCommitHandlerState {
- Starting,
- Open(GraphCommitDataHandler),
+/// Represents the handler of a git cat-file --batch process within Zed
+/// It's used to lazily fetch commit data as needed (whatever a user is viewing)
+enum CommitDataHandlerState {
+ /// The handler is open and processing requests
+ Open(CommitDataHandler),
+ /// The handler closed because it didn't receive any requests in the last 10s
+ /// or hasn't been open before
+ Closed,
+}
+
+enum NextCommitDataRequest {
+ Request(BoxFuture<'static, Result<proto::GetCommitDataResponse>>),
+ Idle,
Closed,
}
@@ -347,7 +360,7 @@ pub struct Repository {
latest_askpass_id: u64,
repository_state: Shared<Task<Result<RepositoryState, String>>>,
initial_graph_data: HashMap<(LogSource, LogOrder), InitialGitGraphData>,
- graph_commit_data_handler: GraphCommitHandlerState,
+ commit_data_handler: CommitDataHandlerState,
commit_data: HashMap<Oid, CommitDataState>,
}
@@ -595,6 +608,7 @@ impl GitStore {
client.add_entity_request_handler(Self::handle_get_head_sha);
client.add_entity_request_handler(Self::handle_edit_ref);
client.add_entity_request_handler(Self::handle_repair_worktrees);
+ client.add_entity_request_handler(Self::handle_get_commit_data);
}
pub fn is_local(&self) -> bool {
@@ -2525,6 +2539,55 @@ impl GitStore {
Ok(proto::GitGetHeadShaResponse { sha: head_sha })
}
+ async fn handle_get_commit_data(
+ this: Entity<Self>,
+ envelope: TypedEnvelope<proto::GetCommitData>,
+ mut cx: AsyncApp,
+ ) -> Result<proto::GetCommitDataResponse> {
+ let repository_id = RepositoryId::from_proto(envelope.payload.repository_id);
+ let repository_handle = Self::repository_for_request(&this, repository_id, &mut cx)?;
+
+ let shas: Vec<Oid> = envelope
+ .payload
+ .shas
+ .iter()
+ .filter_map(|s| Oid::from_str(s).ok())
+ .collect();
+
+ let mut commits = Vec::with_capacity(shas.len());
+ let mut receivers = Vec::new();
+
+ repository_handle.update(&mut cx, |repository, cx| {
+ for &sha in &shas {
+ match repository.fetch_commit_data(sha, true, cx) {
+ CommitDataState::Loaded(data) => {
+ commits.push(commit_data_to_proto(data));
+ }
+ CommitDataState::Loading(Some(shared)) => {
+ receivers.push(shared.clone());
+ }
+ CommitDataState::Loading(None) => {
+ // todo(git_graph) this could happen if the request fails, we should encode an error case
+ debug_panic!(
+ "This should never happen since we passed true into fetch commit data"
+ );
+ }
+ }
+ }
+ });
+
+ let results = future::join_all(receivers).await;
+
+ commits.extend(
+ results
+ .into_iter()
+ .filter_map(|result| result.ok())
+ .map(|data| commit_data_to_proto(&data)),
+ );
+
+ Ok(proto::GetCommitDataResponse { commits })
+ }
+
async fn handle_edit_ref(
this: Entity<Self>,
envelope: TypedEnvelope<proto::GitEditRef>,
@@ -4236,7 +4299,7 @@ impl Repository {
active_jobs: Default::default(),
initial_graph_data: Default::default(),
commit_data: Default::default(),
- graph_commit_data_handler: GraphCommitHandlerState::Closed,
+ commit_data_handler: CommitDataHandlerState::Closed,
}
}
@@ -4274,7 +4337,7 @@ impl Repository {
job_id: 0,
initial_graph_data: Default::default(),
commit_data: Default::default(),
- graph_commit_data_handler: GraphCommitHandlerState::Closed,
+ commit_data_handler: CommitDataHandlerState::Closed,
}
}
@@ -5000,40 +5063,95 @@ impl Repository {
Ok(())
}
- pub fn fetch_commit_data(&mut self, sha: Oid, cx: &mut Context<Self>) -> &CommitDataState {
- if !self.commit_data.contains_key(&sha) {
- match &self.graph_commit_data_handler {
- GraphCommitHandlerState::Open(handler) => {
- if handler.commit_data_request.try_send(sha).is_ok() {
- let old_value = self.commit_data.insert(sha, CommitDataState::Loading);
- debug_assert!(old_value.is_none(), "We should never overwrite commit data");
- }
- }
- GraphCommitHandlerState::Closed => {
- self.open_graph_commit_data_handler(cx);
- }
- GraphCommitHandlerState::Starting => {}
+ pub fn fetch_commit_data(
+ &mut self,
+ sha: Oid,
+ await_result: bool,
+ cx: &mut Context<Self>,
+ ) -> &CommitDataState {
+ if self.commit_data.contains_key(&sha) {
+ let data = &self.commit_data[&sha];
+
+ if let CommitDataState::Loading(None) = data
+ && await_result
+ {
+ let (tx, rx) = oneshot::channel();
+ self.commit_data
+ .insert(sha, CommitDataState::Loading(Some(rx.shared())));
+
+ let handler = self.get_handler(cx);
+ handler.completion_senders.insert(sha, tx);
}
+
+ return &self.commit_data[&sha];
}
- self.commit_data
- .get(&sha)
- .unwrap_or(&CommitDataState::Loading)
+ let (state, completer) = if await_result {
+ let (tx, rx) = oneshot::channel();
+ (CommitDataState::Loading(Some(rx.shared())), Some(tx))
+ } else {
+ (CommitDataState::Loading(None), None)
+ };
+
+ self.commit_data.insert(sha, state);
+
+ let handler = self.get_handler(cx);
+ if let Some(tx) = completer {
+ handler.completion_senders.insert(sha, tx);
+ }
+ let mut has_failed = false;
+ if handler.commit_data_request.try_send(sha).is_ok() {
+ handler.pending_requests.insert(sha);
+ } else {
+ has_failed = true;
+ handler.completion_senders.remove(&sha);
+ debug_assert!(
+ matches!(
+ self.commit_data.remove(&sha),
+ Some(CommitDataState::Loading(_))
+ ),
+ "Commit data should still be loading when enqueueing the request fails"
+ );
+ }
+
+ &self.commit_data.get(&sha).unwrap_or_else(|| {
+ debug_assert!(!has_failed, "This should always be inserted");
+ &CommitDataState::Loading(None)
+ })
}
- fn open_graph_commit_data_handler(&mut self, cx: &mut Context<Self>) {
- self.graph_commit_data_handler = GraphCommitHandlerState::Starting;
+ fn get_handler(&mut self, cx: &mut Context<Self>) -> &mut CommitDataHandler {
+ if matches!(self.commit_data_handler, CommitDataHandlerState::Closed) {
+ self.commit_data_handler =
+ CommitDataHandlerState::Open(self.open_commit_data_handler(cx));
+ }
+ match &mut self.commit_data_handler {
+ CommitDataHandlerState::Open(handler) => handler,
+ CommitDataHandlerState::Closed => unreachable!(),
+ }
+ }
+
+ fn open_commit_data_handler(&self, cx: &Context<Self>) -> CommitDataHandler {
let state = self.repository_state.clone();
- let (result_tx, result_rx) = smol::channel::bounded::<(Oid, GraphCommitData)>(64);
+ let (result_tx, result_rx) = smol::channel::bounded::<(Oid, CommitData)>(64);
let (request_tx, request_rx) = smol::channel::unbounded::<Oid>();
let foreground_task = cx.spawn(async move |this, cx| {
while let Ok((sha, commit_data)) = result_rx.recv().await {
let result = this.update(cx, |this, cx| {
- let old_value = this
- .commit_data
- .insert(sha, CommitDataState::Loaded(Arc::new(commit_data)));
+ let data = Arc::new(commit_data);
+
+ if let CommitDataHandlerState::Open(handler) = &mut this.commit_data_handler {
+ handler.pending_requests.remove(&sha);
+ if let Some(completion_sender) = handler.completion_senders.remove(&sha) {
+ completion_sender.send(data.clone()).ok();
+ }
+ } else {
+ debug_panic!("The handler state has to be open for this task to exist");
+ }
+
+ let old_value = this.commit_data.insert(sha, CommitDataState::Loaded(data));
debug_assert!(
!matches!(old_value, Some(CommitDataState::Loaded(_))),
"We should never overwrite commit data"
@@ -5047,69 +5165,250 @@ impl Repository {
}
this.update(cx, |this, _cx| {
- this.graph_commit_data_handler = GraphCommitHandlerState::Closed;
+ let CommitDataHandlerState::Open(handler) = std::mem::replace(
+ &mut this.commit_data_handler,
+ CommitDataHandlerState::Closed,
+ ) else {
+ debug_panic!("The handler state has to be open for this task to exist");
+ return;
+ };
+
+ for sha in handler.pending_requests {
+ this.commit_data.remove(&sha);
+ }
})
.ok();
});
let request_tx_for_handler = request_tx;
+ let repository_id = self.id;
let background_executor = cx.background_executor().clone();
cx.background_spawn(async move {
- let backend = match state.await {
- Ok(RepositoryState::Local(LocalRepositoryState { backend, .. })) => backend,
- Ok(RepositoryState::Remote(_)) => {
- log::error!("commit_data_reader not supported for remote repositories");
- return;
+ match state.await {
+ Ok(RepositoryState::Local(LocalRepositoryState { backend, .. })) => {
+ Self::local_commit_data_reader(
+ backend,
+ request_rx,
+ result_tx,
+ background_executor,
+ )
+ .await;
+ }
+ Ok(RepositoryState::Remote(RemoteRepositoryState { project_id, client })) => {
+ Self::remote_commit_data_reader(
+ project_id,
+ client,
+ repository_id,
+ request_rx,
+ result_tx,
+ background_executor,
+ )
+ .await;
}
Err(error) => {
log::error!("failed to get repository state: {error}");
return;
}
};
+ })
+ .detach();
- let reader = match backend.commit_data_reader() {
- Ok(reader) => reader,
- Err(error) => {
- log::error!("failed to create commit data reader: {error:?}");
- return;
- }
- };
+ CommitDataHandler {
+ _task: foreground_task,
+ commit_data_request: request_tx_for_handler,
+ completion_senders: HashMap::default(),
+ pending_requests: HashSet::default(),
+ }
+ }
- loop {
- let timeout = background_executor.timer(std::time::Duration::from_secs(10));
+ async fn local_commit_data_reader(
+ backend: Arc<dyn GitRepository>,
+ request_rx: smol::channel::Receiver<Oid>,
+ result_tx: smol::channel::Sender<(Oid, CommitData)>,
+ background_executor: BackgroundExecutor,
+ ) {
+ let reader = match backend.commit_data_reader() {
+ Ok(reader) => reader,
+ Err(error) => {
+ log::error!("failed to create commit data reader: {error:?}");
+ return;
+ }
+ };
- futures::select_biased! {
- sha = futures::FutureExt::fuse(request_rx.recv()) => {
- let Ok(sha) = sha else {
- break;
- };
+ loop {
+ let timeout = background_executor.timer(std::time::Duration::from_secs(10));
- match reader.read(sha).await {
- Ok(commit_data) => {
- if result_tx.send((sha, commit_data)).await.is_err() {
- break;
- }
- }
- Err(error) => {
- log::error!("failed to read commit data for {sha}: {error:?}");
+ futures::select_biased! {
+ sha = futures::FutureExt::fuse(request_rx.recv()) => {
+ let Ok(sha) = sha else {
+ break;
+ };
+
+ match reader.read(sha).await {
+ Ok(commit_data) => {
+ if result_tx.send((sha, commit_data)).await.is_err() {
+ break;
}
}
+ Err(error) => {
+ log::error!("failed to read commit data for {sha}: {error:?}");
+ }
}
- _ = futures::FutureExt::fuse(timeout) => {
- break;
+ }
+ _ = futures::FutureExt::fuse(timeout) => {
+ break;
+ }
+ }
+ }
+
+ drop(result_tx);
+ }
+
+ async fn remote_commit_data_reader(
+ project_id: ProjectId,
+ client: AnyProtoClient,
+ repository_id: RepositoryId,
+ request_rx: smol::channel::Receiver<Oid>,
+ result_tx: smol::channel::Sender<(Oid, CommitData)>,
+ background_executor: BackgroundExecutor,
+ ) {
+ let mut response_futures =
+ FuturesUnordered::<BoxFuture<'static, Result<proto::GetCommitDataResponse>>>::new();
+ let mut accept_requests = true;
+ let mut next_request = Self::get_next_request(
+ project_id,
+ client.clone(),
+ repository_id,
+ &request_rx,
+ &background_executor,
+ )
+ .boxed()
+ .fuse();
+
+ loop {
+ if !accept_requests && response_futures.is_empty() {
+ break;
+ }
+
+ if response_futures.is_empty() {
+ match (&mut next_request).await {
+ NextCommitDataRequest::Request(request) => {
+ response_futures.push(request);
+ next_request = Self::get_next_request(
+ project_id,
+ client.clone(),
+ repository_id,
+ &request_rx,
+ &background_executor,
+ )
+ .boxed()
+ .fuse();
}
+ NextCommitDataRequest::Closed | NextCommitDataRequest::Idle => break,
}
}
- drop(result_tx);
- })
- .detach();
+ let next_response = response_futures.next().fuse();
+ futures::pin_mut!(next_response);
- self.graph_commit_data_handler = GraphCommitHandlerState::Open(GraphCommitDataHandler {
- _task: foreground_task,
- commit_data_request: request_tx_for_handler,
- });
+ futures::select_biased! {
+ request = next_request => {
+ match request {
+ NextCommitDataRequest::Request(request) => {
+ response_futures.push(request);
+ }
+ NextCommitDataRequest::Idle => {}
+ NextCommitDataRequest::Closed => {
+ accept_requests = false;
+ }
+ }
+
+ if accept_requests {
+ next_request = Self::get_next_request(
+ project_id,
+ client.clone(),
+ repository_id,
+ &request_rx,
+ &background_executor,
+ )
+ .boxed()
+ .fuse();
+ }
+ }
+ result = next_response => {
+ let Some(result) = result else {
+ continue;
+ };
+
+ if let Ok(commit_data) = result {
+ for commit in commit_data.commits {
+ let Ok(commit_data) = commit_data_from_proto(commit) else {
+ continue;
+ };
+
+ if result_tx
+ .send((commit_data.sha, commit_data))
+ .await
+ .is_err()
+ {
+ return;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ drop(result_tx);
+ }
+
+ async fn get_next_request(
+ project_id: ProjectId,
+ client: AnyProtoClient,
+ repository_id: RepositoryId,
+ request_rx: &smol::channel::Receiver<Oid>,
+ background_executor: &BackgroundExecutor,
+ ) -> NextCommitDataRequest {
+ let mut queued_shas = Vec::with_capacity(64);
+
+ loop {
+ if queued_shas.len() >= 64 {
+ break;
+ }
+
+ let timeout = background_executor.timer(Duration::from_millis(5));
+
+ futures::select_biased! {
+ sha = futures::FutureExt::fuse(request_rx.recv()) => {
+ let Ok(sha) = sha else {
+ break;
+ };
+
+ queued_shas.push(sha);
+
+ }
+ _ = futures::FutureExt::fuse(timeout) => {
+ break;
+ }
+ }
+ }
+
+ if queued_shas.is_empty() && request_rx.is_closed() {
+ NextCommitDataRequest::Closed
+ } else if queued_shas.is_empty() {
+ NextCommitDataRequest::Idle
+ } else {
+ NextCommitDataRequest::Request(
+ client
+ .request(proto::GetCommitData {
+ project_id: project_id.to_proto(),
+ repository_id: repository_id.to_proto(),
+ shas: queued_shas.into_iter().map(|oid| oid.to_string()).collect(),
+ })
+ .boxed(),
+ )
+ }
}
fn buffer_store(&self, cx: &App) -> Option<Entity<BufferStore>> {
@@ -7625,6 +7924,35 @@ fn deserialize_blame_buffer_response(
Some(Blame { entries, messages })
}
+fn commit_data_to_proto(commit: &CommitData) -> proto::CommitData {
+ proto::CommitData {
+ sha: commit.sha.to_string(),
+ parents: commit.parents.iter().map(|p| p.to_string()).collect(),
+ author_name: commit.author_name.to_string(),
+ author_email: commit.author_email.to_string(),
+ commit_timestamp: commit.commit_timestamp,
+ subject: commit.subject.to_string(),
+ message: commit.message.to_string(),
+ }
+}
+
+fn commit_data_from_proto(commit: proto::CommitData) -> Result<CommitData> {
+ let sha = Oid::from_str(&commit.sha)?;
+ let mut parents = SmallVec::with_capacity(commit.parents.len());
+ for parent in &commit.parents {
+ parents.push(Oid::from_str(parent)?);
+ }
+ Ok(CommitData {
+ sha,
+ parents,
+ author_name: SharedString::from(commit.author_name),
+ author_email: SharedString::from(commit.author_email),
+ commit_timestamp: commit.commit_timestamp,
+ subject: SharedString::from(commit.subject),
+ message: SharedString::from(commit.message),
+ })
+}
+
fn branch_to_proto(branch: &git::repository::Branch) -> proto::Branch {
proto::Branch {
is_head: branch.is_head,
@@ -7735,6 +8063,354 @@ fn proto_to_commit_details(proto: &proto::GitCommitDetails) -> CommitDetails {
}
}
+#[cfg(any(test, feature = "test-support"))]
+impl Repository {
+ pub fn loaded_commit_data_for_test(&self) -> HashMap<Oid, CommitData> {
+ self.commit_data
+ .iter()
+ .filter_map(|(sha, state)| match state {
+ CommitDataState::Loaded(data) => Some((*sha, data.as_ref().clone())),
+ CommitDataState::Loading(_) => None,
+ })
+ .collect()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::Project;
+ use fs::FakeFs;
+ use gpui::TestAppContext;
+ use gpui::proptest::prelude::*;
+ use rand::{SeedableRng, rngs::StdRng};
+ use serde_json::json;
+ use settings::SettingsStore;
+ use std::path::Path;
+
+ fn init_test(cx: &mut TestAppContext) {
+ cx.update(|cx| {
+ let settings_store = SettingsStore::test(cx);
+ cx.set_global(settings_store);
+ });
+ }
+
+ fn verify_invariants(repository: &Repository) -> anyhow::Result<()> {
+ match &repository.commit_data_handler {
+ CommitDataHandlerState::Open(handler) => {
+ verify_loading_entries_are_pending(repository, handler)?;
+ verify_await_result_loading_entries_have_completion_senders(repository, handler)?;
+ verify_pending_requests_are_loading(repository, handler)?;
+ verify_completion_senders_are_await_result_loading(repository, handler)?;
+ verify_completion_senders_are_pending(handler)?;
+ verify_non_await_result_loading_entries_have_no_completion_sender(
+ repository, handler,
+ )?;
+ verify_loaded_entries_are_not_pending(repository, handler)?;
+ verify_loaded_entries_have_no_completion_sender(repository, handler)?;
+ }
+ CommitDataHandlerState::Closed => {
+ verify_closed_handler_invariants(repository)?;
+ }
+ }
+
+ Ok(())
+ }
+
+ fn verify_loading_entries_are_pending(
+ repository: &Repository,
+ handler: &CommitDataHandler,
+ ) -> anyhow::Result<()> {
+ for (sha, state) in &repository.commit_data {
+ if matches!(state, CommitDataState::Loading(_)) {
+ anyhow::ensure!(
+ handler.pending_requests.contains(sha),
+ "loading commit data for {sha} must be tracked in pending_requests"
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ fn verify_await_result_loading_entries_have_completion_senders(
+ repository: &Repository,
+ handler: &CommitDataHandler,
+ ) -> anyhow::Result<()> {
+ for (sha, state) in &repository.commit_data {
+ if matches!(state, CommitDataState::Loading(Some(_))) {
+ anyhow::ensure!(
+ handler.completion_senders.contains_key(sha),
+ "await-result loading commit data for {sha} must have a completion sender"
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ fn verify_pending_requests_are_loading(
+ repository: &Repository,
+ handler: &CommitDataHandler,
+ ) -> anyhow::Result<()> {
+ for sha in &handler.pending_requests {
+ anyhow::ensure!(
+ matches!(
+ repository.commit_data.get(sha),
+ Some(CommitDataState::Loading(_))
+ ),
+ "pending request for {sha} must correspond to loading commit data"
+ );
+ }
+
+ Ok(())
+ }
+
+ fn verify_completion_senders_are_await_result_loading(
+ repository: &Repository,
+ handler: &CommitDataHandler,
+ ) -> anyhow::Result<()> {
+ for sha in handler.completion_senders.keys() {
+ anyhow::ensure!(
+ matches!(
+ repository.commit_data.get(sha),
+ Some(CommitDataState::Loading(Some(_)))
+ ),
+ "completion sender for {sha} must correspond to await-result loading commit data"
+ );
+ }
+
+ Ok(())
+ }
+
+ fn verify_completion_senders_are_pending(handler: &CommitDataHandler) -> anyhow::Result<()> {
+ for sha in handler.completion_senders.keys() {
+ anyhow::ensure!(
+ handler.pending_requests.contains(sha),
+ "completion sender for {sha} must also be tracked as pending"
+ );
+ }
+
+ Ok(())
+ }
+
+ fn verify_non_await_result_loading_entries_have_no_completion_sender(
+ repository: &Repository,
+ handler: &CommitDataHandler,
+ ) -> anyhow::Result<()> {
+ for (sha, state) in &repository.commit_data {
+ if matches!(state, CommitDataState::Loading(None)) {
+ anyhow::ensure!(
+ !handler.completion_senders.contains_key(sha),
+ "non-await-result loading commit data for {sha} must not have a completion sender"
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ fn verify_loaded_entries_are_not_pending(
+ repository: &Repository,
+ handler: &CommitDataHandler,
+ ) -> anyhow::Result<()> {
+ for (sha, state) in &repository.commit_data {
+ if matches!(state, CommitDataState::Loaded(_)) {
+ anyhow::ensure!(
+ !handler.pending_requests.contains(sha),
+ "loaded commit data for {sha} must not still be pending"
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ fn verify_loaded_entries_have_no_completion_sender(
+ repository: &Repository,
+ handler: &CommitDataHandler,
+ ) -> anyhow::Result<()> {
+ for (sha, state) in &repository.commit_data {
+ if matches!(state, CommitDataState::Loaded(_)) {
+ anyhow::ensure!(
+ !handler.completion_senders.contains_key(sha),
+ "loaded commit data for {sha} must not keep a completion sender"
+ );
+ }
+ }
+
+ Ok(())
+ }
+
+ fn verify_closed_handler_invariants(repository: &Repository) -> anyhow::Result<()> {
+ for (sha, state) in &repository.commit_data {
+ anyhow::ensure!(
+ !matches!(state, CommitDataState::Loading(_)),
+ "closed handler must not keep loading commit data for {sha}"
+ );
+ }
+
+ Ok(())
+ }
+
+ #[gpui::property_test(config = ProptestConfig {
+ cases: 20,
+ ..Default::default()
+ })]
+ async fn test_commit_data_random_invariants(
+ #[strategy = any::<u64>()] seed: u64,
+ #[strategy = gpui::proptest::collection::vec(0usize..2000, 1..200)] commit_indexes: Vec<
+ usize,
+ >,
+ #[strategy = gpui::proptest::collection::vec(any::<bool>(), 1..200)] await_results: Vec<
+ bool,
+ >,
+ #[strategy = gpui::proptest::collection::vec(0usize..2000, 0..200)] failing_commit_indexes: Vec<
+ usize,
+ >,
+ #[strategy = gpui::proptest::collection::vec(0usize..2000, 0..200)] missing_commit_indexes: Vec<
+ usize,
+ >,
+ cx: &mut TestAppContext,
+ ) {
+ init_test(cx);
+ let mut rng = StdRng::seed_from_u64(seed);
+
+ let commit_shas = (0..2000).map(|_| Oid::random(&mut rng)).collect::<Vec<_>>();
+ let failing_shas = failing_commit_indexes
+ .into_iter()
+ .map(|index| commit_shas[index % commit_shas.len()])
+ .collect::<HashSet<_>>();
+ let missing_shas = missing_commit_indexes
+ .into_iter()
+ .map(|index| commit_shas[index % commit_shas.len()])
+ .collect::<HashSet<_>>();
+ let commit_data = commit_shas
+ .iter()
+ .filter(|sha| !missing_shas.contains(sha))
+ .map(|sha| {
+ (
+ CommitData {
+ sha: *sha,
+ parents: SmallVec::new(),
+ author_name: SharedString::from(format!("Author {sha}")),
+ author_email: SharedString::from(format!("{sha}@example.com")),
+ commit_timestamp: rng.random_range(0..10_000),
+ subject: SharedString::from(format!("Subject {sha}")),
+ message: SharedString::from(format!("Subject {sha}\n\nBody for {sha}")),
+ },
+ failing_shas.contains(sha),
+ )
+ })
+ .collect::<Vec<_>>();
+ let expected_loaded_shas = commit_indexes
+ .iter()
+ .map(|index| commit_shas[index % commit_shas.len()])
+ .filter(|sha| !failing_shas.contains(sha) && !missing_shas.contains(sha))
+ .collect::<HashSet<_>>();
+
+ let fs = FakeFs::new(cx.executor());
+ fs.insert_tree(
+ Path::new("/project"),
+ json!({
+ ".git": {},
+ "file.txt": "content",
+ }),
+ )
+ .await;
+ fs.set_commit_data(Path::new("/project/.git"), commit_data);
+
+ let project = Project::test(fs.clone(), [Path::new("/project")], cx).await;
+ project
+ .update(cx, |project, cx| project.git_scans_complete(cx))
+ .await;
+
+ let repository = project.read_with(cx, |project, cx| {
+ project
+ .active_repository(cx)
+ .expect("should have a repository")
+ });
+
+ cx.update(|cx| {
+ cx.observe(&repository, |repo, cx| {
+ verify_invariants(repo.read(cx))
+ .context("Invariant weren't held after a cx.notify")
+ .unwrap();
+ })
+ })
+ .detach();
+
+ let mut next_step = 0;
+ while next_step < commit_indexes.len() {
+ let remaining_steps = commit_indexes.len() - next_step;
+ let chunk_size = rng.random_range(1..=remaining_steps.min(16));
+ let chunk_end = next_step + chunk_size;
+
+ for step in next_step..chunk_end {
+ let sha = commit_shas[commit_indexes[step] % commit_shas.len()];
+ let await_result = await_results[step % await_results.len()];
+
+ repository.update(cx, |repository, cx| {
+ repository.fetch_commit_data(sha, await_result, cx);
+ verify_invariants(repository)
+ .with_context(|| {
+ format!(
+ "commit data invariant violation after step {} for sha {}",
+ step + 1,
+ sha,
+ )
+ })
+ .unwrap();
+ });
+ }
+
+ cx.run_until_parked();
+ repository.read_with(cx, |repository, _cx| {
+ verify_invariants(repository)
+ .with_context(|| {
+ format!(
+ "commit data invariant violation after draining through step {}",
+ chunk_end,
+ )
+ })
+ .unwrap();
+ });
+
+ next_step = chunk_end;
+ }
+
+ cx.run_until_parked();
+ repository.read_with(cx, |repository, _cx| {
+ verify_invariants(repository)
+ .with_context(|| "commit data invariant violation after final drain".to_string())
+ .unwrap();
+
+ let loaded_shas = repository
+ .commit_data
+ .iter()
+ .filter_map(|(sha, state)| match state {
+ CommitDataState::Loaded(_) => Some(*sha),
+ CommitDataState::Loading(_) => None,
+ })
+ .collect::<HashSet<_>>();
+ let missing_loaded_shas = expected_loaded_shas
+ .difference(&loaded_shas)
+ .copied()
+ .collect::<Vec<_>>();
+ let unexpected_loaded_shas = loaded_shas
+ .difference(&expected_loaded_shas)
+ .copied()
+ .collect::<Vec<_>>();
+ assert!(
+ missing_loaded_shas.is_empty() && unexpected_loaded_shas.is_empty(),
+ "loaded commit data SHAs after final drain did not match expectation. missing: {:?}, unexpected: {:?}",
+ missing_loaded_shas,
+ unexpected_loaded_shas,
+ );
+ });
+ }
+}
+
/// This snapshot computes the repository state on the foreground thread while
/// running the git commands on the background thread. We update branch, head,
/// remotes, and worktrees first so the UI can react sooner, then compute file