diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 084d43af1b86eda598f1fadfca726e29b82bac2f..538b0fa4b0101be73c64d366faf09238e5d8da16 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -28,10 +28,7 @@ use std::{ convert::TryFrom, fmt::Write as _, future::Future, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, Weak, - }, + sync::{Arc, Weak}, time::{Duration, Instant}, }; use thiserror::Error; @@ -232,12 +229,8 @@ impl Drop for Subscription { impl Client { pub fn new(http: Arc) -> Arc { - lazy_static! { - static ref NEXT_CLIENT_ID: AtomicUsize = AtomicUsize::default(); - } - Arc::new(Self { - id: NEXT_CLIENT_ID.fetch_add(1, Ordering::SeqCst), + id: 0, peer: Peer::new(), http, state: Default::default(), @@ -257,6 +250,12 @@ impl Client { self.http.clone() } + #[cfg(any(test, feature = "test-support"))] + pub fn set_id(&mut self, id: usize) -> &Self { + self.id = id; + self + } + #[cfg(any(test, feature = "test-support"))] pub fn tear_down(&self) { let mut state = self.state.write(); diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index ce494db59bb6222b90533d0129f86c21e651e697..96c93eb93494b6a7d4030ff105d13fa439ea2178 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -2117,7 +2117,7 @@ pub mod tests { Self { background, users: Default::default(), - next_user_id: Mutex::new(1), + next_user_id: Mutex::new(0), projects: Default::default(), worktree_extensions: Default::default(), next_project_id: Mutex::new(1), @@ -2181,6 +2181,7 @@ pub mod tests { } async fn get_user_by_id(&self, id: UserId) -> Result> { + self.background.simulate_random_delay().await; Ok(self.get_users_by_ids(vec![id]).await?.into_iter().next()) } @@ -2195,6 +2196,7 @@ pub mod tests { } async fn get_user_by_github_login(&self, github_login: &str) -> Result> { + self.background.simulate_random_delay().await; Ok(self .users .lock() @@ -2228,6 +2230,7 @@ pub mod tests { } async fn get_invite_code_for_user(&self, _id: UserId) -> Result> { + self.background.simulate_random_delay().await; Ok(None) } @@ -2265,6 +2268,7 @@ pub mod tests { } async fn unregister_project(&self, project_id: ProjectId) -> Result<()> { + self.background.simulate_random_delay().await; self.projects .lock() .get_mut(&project_id) @@ -2370,6 +2374,7 @@ pub mod tests { requester_id: UserId, responder_id: UserId, ) -> Result<()> { + self.background.simulate_random_delay().await; let mut contacts = self.contacts.lock(); for contact in contacts.iter_mut() { if contact.requester_id == requester_id && contact.responder_id == responder_id { @@ -2399,6 +2404,7 @@ pub mod tests { } async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> { + self.background.simulate_random_delay().await; self.contacts.lock().retain(|contact| { !(contact.requester_id == requester_id && contact.responder_id == responder_id) }); @@ -2410,6 +2416,7 @@ pub mod tests { user_id: UserId, contact_user_id: UserId, ) -> Result<()> { + self.background.simulate_random_delay().await; let mut contacts = self.contacts.lock(); for contact in contacts.iter_mut() { if contact.requester_id == contact_user_id @@ -2436,6 +2443,7 @@ pub mod tests { requester_id: UserId, accept: bool, ) -> Result<()> { + self.background.simulate_random_delay().await; let mut contacts = self.contacts.lock(); for (ix, contact) in contacts.iter_mut().enumerate() { if contact.requester_id == requester_id && contact.responder_id == responder_id { @@ -2631,6 +2639,7 @@ pub mod tests { count: usize, before_id: Option, ) -> Result> { + self.background.simulate_random_delay().await; let mut messages = self .channel_messages .lock() diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index facef17b63b52192f668fa1aaee0aeb777e7a665..4d23c00d42252f0103b3731288a4ec3f959e8d49 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -50,7 +50,6 @@ use std::{ time::Duration, }; use theme::ThemeRegistry; -use tokio::sync::RwLockReadGuard; use workspace::{Item, SplitDirection, ToggleFollow, Workspace}; #[ctor::ctor] @@ -589,7 +588,7 @@ async fn test_offline_projects( deterministic.run_until_parked(); assert!(server .store - .read() + .lock() .await .project_metadata_for_user(user_a) .is_empty()); @@ -620,7 +619,7 @@ async fn test_offline_projects( cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT); assert!(server .store - .read() + .lock() .await .project_metadata_for_user(user_a) .is_empty()); @@ -1446,7 +1445,7 @@ async fn test_collaborating_with_diagnostics( // Wait for server to see the diagnostics update. deterministic.run_until_parked(); { - let store = server.store.read().await; + let store = server.store.lock().await; let project = store.project(ProjectId::from_proto(project_id)).unwrap(); let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap(); assert!(!worktree.diagnostic_summaries.is_empty()); @@ -1472,6 +1471,7 @@ async fn test_collaborating_with_diagnostics( // Join project as client C and observe the diagnostics. let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await; + deterministic.run_until_parked(); project_c.read_with(cx_c, |project, cx| { assert_eq!( project.diagnostic_summaries(cx).collect::>(), @@ -3171,7 +3171,7 @@ async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { assert_eq!( server - .state() + .store() .await .channel(channel_id) .unwrap() @@ -4425,8 +4425,16 @@ async fn test_random_collaboration( let mut server = TestServer::start(cx.foreground(), cx.background()).await; let db = server.app_state.db.clone(); let host_user_id = db.create_user("host", None, false).await.unwrap(); - for username in ["guest-1", "guest-2", "guest-3", "guest-4"] { + let mut available_guests = vec![ + "guest-1".to_string(), + "guest-2".to_string(), + "guest-3".to_string(), + "guest-4".to_string(), + ]; + + for username in &available_guests { let guest_user_id = db.create_user(username, None, false).await.unwrap(); + assert_eq!(*username, format!("guest-{}", guest_user_id)); server .app_state .db @@ -4620,12 +4628,7 @@ async fn test_random_collaboration( } else { max_operations }; - let mut available_guests = vec![ - "guest-1".to_string(), - "guest-2".to_string(), - "guest-3".to_string(), - "guest-4".to_string(), - ]; + let mut operations = 0; while operations < max_operations { if operations == disconnect_host_at { @@ -4656,7 +4659,7 @@ async fn test_random_collaboration( .unwrap(); let contacts = server .store - .read() + .lock() .await .build_initial_contacts_update(contacts) .contacts; @@ -4728,6 +4731,7 @@ async fn test_random_collaboration( server.disconnect_client(removed_guest_id); deterministic.advance_clock(RECEIVE_TIMEOUT); deterministic.start_waiting(); + log::info!("Waiting for guest {} to exit...", removed_guest_id); let (guest, guest_project, mut guest_cx, guest_err) = guest.await; deterministic.finish_waiting(); server.allow_connections(); @@ -4740,7 +4744,7 @@ async fn test_random_collaboration( let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap(); let contacts = server .store - .read() + .lock() .await .build_initial_contacts_update(contacts) .contacts; @@ -4944,6 +4948,7 @@ impl TestServer { Arc::get_mut(&mut client) .unwrap() + .set_id(user_id.0 as usize) .override_authenticate(move |cx| { cx.spawn(|_| async move { let access_token = "the-token".to_string(); @@ -5071,10 +5076,6 @@ impl TestServer { }) } - async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> { - self.server.store.read().await - } - async fn condition(&mut self, mut predicate: F) where F: FnMut(&Store) -> bool, @@ -5083,7 +5084,7 @@ impl TestServer { self.foreground.parking_forbidden(), "you must call forbid_parking to use server conditions so we don't block indefinitely" ); - while !(predicate)(&*self.server.store.read().await) { + while !(predicate)(&*self.server.store.lock().await) { self.foreground.start_waiting(); self.notifications.next().await; self.foreground.finish_waiting(); diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 79c1e53a0b5f906cb961dc73a2348d1866d7fcd2..b7b0e00f2dfb0e0c79dca4b1cec8467f59f22ad3 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -51,7 +51,7 @@ use std::{ }; use time::OffsetDateTime; use tokio::{ - sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, + sync::{Mutex, MutexGuard}, time::Sleep, }; use tower::ServiceBuilder; @@ -97,7 +97,7 @@ impl Response { pub struct Server { peer: Arc, - pub(crate) store: RwLock, + pub(crate) store: Mutex, app_state: Arc, handlers: HashMap, notifications: Option>, @@ -115,13 +115,8 @@ pub struct RealExecutor; const MESSAGE_COUNT_PER_PAGE: usize = 100; const MAX_MESSAGE_LEN: usize = 1024; -struct StoreReadGuard<'a> { - guard: RwLockReadGuard<'a, Store>, - _not_send: PhantomData>, -} - -struct StoreWriteGuard<'a> { - guard: RwLockWriteGuard<'a, Store>, +pub(crate) struct StoreGuard<'a> { + guard: MutexGuard<'a, Store>, _not_send: PhantomData>, } @@ -129,7 +124,7 @@ struct StoreWriteGuard<'a> { pub struct ServerSnapshot<'a> { peer: &'a Peer, #[serde(serialize_with = "serialize_deref")] - store: RwLockReadGuard<'a, Store>, + store: StoreGuard<'a>, } pub fn serialize_deref(value: &T, serializer: S) -> Result @@ -384,7 +379,7 @@ impl Server { ).await?; { - let mut store = this.store_mut().await; + let mut store = this.store().await; store.add_connection(connection_id, user_id, user.admin); this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?; @@ -471,7 +466,7 @@ impl Server { let mut projects_to_unregister = Vec::new(); let removed_user_id; { - let mut store = self.store_mut().await; + let mut store = self.store().await; let removed_connection = store.remove_connection(connection_id)?; for (project_id, project) in removed_connection.hosted_projects { @@ -605,7 +600,7 @@ impl Server { .await .user_id_for_connection(request.sender_id)?; let project_id = self.app_state.db.register_project(user_id).await?; - self.store_mut() + self.store() .await .register_project(request.sender_id, project_id)?; @@ -623,7 +618,7 @@ impl Server { ) -> Result<()> { let project_id = ProjectId::from_proto(request.payload.project_id); let (user_id, project) = { - let mut state = self.store_mut().await; + let mut state = self.store().await; let project = state.unregister_project(project_id, request.sender_id)?; (state.user_id_for_connection(request.sender_id)?, project) }; @@ -725,7 +720,7 @@ impl Server { return Err(anyhow!("no such project"))?; } - self.store_mut().await.request_join_project( + self.store().await.request_join_project( guest_user_id, project_id, response.into_receipt(), @@ -747,7 +742,7 @@ impl Server { let host_user_id; { - let mut state = self.store_mut().await; + let mut state = self.store().await; let project_id = ProjectId::from_proto(request.payload.project_id); let project = state.project(project_id)?; if project.host_connection_id != request.sender_id { @@ -791,20 +786,10 @@ impl Server { let worktrees = project .worktrees .iter() - .filter_map(|(id, shared_worktree)| { - let worktree = project.worktrees.get(&id)?; - Some(proto::Worktree { - id: *id, - root_name: worktree.root_name.clone(), - entries: shared_worktree.entries.values().cloned().collect(), - diagnostic_summaries: shared_worktree - .diagnostic_summaries - .values() - .cloned() - .collect(), - visible: worktree.visible, - scan_id: shared_worktree.scan_id, - }) + .map(|(id, worktree)| proto::WorktreeMetadata { + id: *id, + root_name: worktree.root_name.clone(), + visible: worktree.visible, }) .collect::>(); @@ -840,14 +825,15 @@ impl Server { } } - for (receipt, replica_id) in receipts_with_replica_ids { + // First, we send the metadata associated with each worktree. + for (receipt, replica_id) in &receipts_with_replica_ids { self.peer.respond( - receipt, + receipt.clone(), proto::JoinProjectResponse { variant: Some(proto::join_project_response::Variant::Accept( proto::join_project_response::Accept { worktrees: worktrees.clone(), - replica_id: replica_id as u32, + replica_id: *replica_id as u32, collaborators: collaborators.clone(), language_servers: project.language_servers.clone(), }, @@ -855,6 +841,43 @@ impl Server { }, )?; } + + for (worktree_id, worktree) in &project.worktrees { + #[cfg(any(test, feature = "test-support"))] + const MAX_CHUNK_SIZE: usize = 2; + #[cfg(not(any(test, feature = "test-support")))] + const MAX_CHUNK_SIZE: usize = 256; + + // Stream this worktree's entries. + let message = proto::UpdateWorktree { + project_id: project_id.to_proto(), + worktree_id: *worktree_id, + root_name: worktree.root_name.clone(), + updated_entries: worktree.entries.values().cloned().collect(), + removed_entries: Default::default(), + scan_id: worktree.scan_id, + is_last_update: worktree.is_complete, + }; + for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) { + for (receipt, _) in &receipts_with_replica_ids { + self.peer.send(receipt.sender_id, update.clone())?; + } + } + + // Stream this worktree's diagnostics. + for summary in worktree.diagnostic_summaries.values() { + for (receipt, _) in &receipts_with_replica_ids { + self.peer.send( + receipt.sender_id, + proto::UpdateDiagnosticSummary { + project_id: project_id.to_proto(), + worktree_id: *worktree_id, + summary: Some(summary.clone()), + }, + )?; + } + } + } } self.update_user_contacts(host_user_id).await?; @@ -869,7 +892,7 @@ impl Server { let project_id = ProjectId::from_proto(request.payload.project_id); let project; { - let mut store = self.store_mut().await; + let mut store = self.store().await; project = store.leave_project(sender_id, project_id)?; tracing::info!( %project_id, @@ -920,7 +943,7 @@ impl Server { let project_id = ProjectId::from_proto(request.payload.project_id); let user_id; { - let mut state = self.store_mut().await; + let mut state = self.store().await; user_id = state.user_id_for_connection(request.sender_id)?; let guest_connection_ids = state .read_project(project_id, request.sender_id)? @@ -939,7 +962,7 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { - self.store_mut().await.register_project_activity( + self.store().await.register_project_activity( ProjectId::from_proto(request.payload.project_id), request.sender_id, )?; @@ -954,7 +977,7 @@ impl Server { let project_id = ProjectId::from_proto(request.payload.project_id); let worktree_id = request.payload.worktree_id; let (connection_ids, metadata_changed, extension_counts) = { - let mut store = self.store_mut().await; + let mut store = self.store().await; let (connection_ids, metadata_changed, extension_counts) = store.update_worktree( request.sender_id, project_id, @@ -963,6 +986,7 @@ impl Server { &request.payload.removed_entries, &request.payload.updated_entries, request.payload.scan_id, + request.payload.is_last_update, )?; (connection_ids, metadata_changed, extension_counts.clone()) }; @@ -995,7 +1019,7 @@ impl Server { .summary .clone() .ok_or_else(|| anyhow!("invalid summary"))?; - let receiver_ids = self.store_mut().await.update_diagnostic_summary( + let receiver_ids = self.store().await.update_diagnostic_summary( ProjectId::from_proto(request.payload.project_id), request.payload.worktree_id, request.sender_id, @@ -1013,7 +1037,7 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { - let receiver_ids = self.store_mut().await.start_language_server( + let receiver_ids = self.store().await.start_language_server( ProjectId::from_proto(request.payload.project_id), request.sender_id, request @@ -1052,20 +1076,23 @@ impl Server { where T: EntityMessage + RequestMessage, { + let project_id = ProjectId::from_proto(request.payload.remote_entity_id()); let host_connection_id = self .store() .await - .read_project( - ProjectId::from_proto(request.payload.remote_entity_id()), - request.sender_id, - )? + .read_project(project_id, request.sender_id)? .host_connection_id; + let payload = self + .peer + .forward_request(request.sender_id, host_connection_id, request.payload) + .await?; - response.send( - self.peer - .forward_request(request.sender_id, host_connection_id, request.payload) - .await?, - )?; + // Ensure project still exists by the time we get the response from the host. + self.store() + .await + .read_project(project_id, request.sender_id)?; + + response.send(payload)?; Ok(()) } @@ -1106,7 +1133,7 @@ impl Server { ) -> Result<()> { let project_id = ProjectId::from_proto(request.payload.project_id); let receiver_ids = { - let mut store = self.store_mut().await; + let mut store = self.store().await; store.register_project_activity(project_id, request.sender_id)?; store.project_connection_ids(project_id, request.sender_id)? }; @@ -1173,7 +1200,7 @@ impl Server { let leader_id = ConnectionId(request.payload.leader_id); let follower_id = request.sender_id; { - let mut store = self.store_mut().await; + let mut store = self.store().await; if !store .project_connection_ids(project_id, follower_id)? .contains(&leader_id) @@ -1198,7 +1225,7 @@ impl Server { async fn unfollow(self: Arc, request: TypedEnvelope) -> Result<()> { let project_id = ProjectId::from_proto(request.payload.project_id); let leader_id = ConnectionId(request.payload.leader_id); - let mut store = self.store_mut().await; + let mut store = self.store().await; if !store .project_connection_ids(project_id, request.sender_id)? .contains(&leader_id) @@ -1216,7 +1243,7 @@ impl Server { request: TypedEnvelope, ) -> Result<()> { let project_id = ProjectId::from_proto(request.payload.project_id); - let mut store = self.store_mut().await; + let mut store = self.store().await; store.register_project_activity(project_id, request.sender_id)?; let connection_ids = store.project_connection_ids(project_id, request.sender_id)?; let leader_id = request @@ -1474,7 +1501,7 @@ impl Server { Err(anyhow!("access denied"))?; } - self.store_mut() + self.store() .await .join_channel(request.sender_id, channel_id); let messages = self @@ -1516,7 +1543,7 @@ impl Server { Err(anyhow!("access denied"))?; } - self.store_mut() + self.store() .await .leave_channel(request.sender_id, channel_id); @@ -1624,25 +1651,13 @@ impl Server { Ok(()) } - async fn store<'a>(self: &'a Arc) -> StoreReadGuard<'a> { - #[cfg(test)] - tokio::task::yield_now().await; - let guard = self.store.read().await; - #[cfg(test)] - tokio::task::yield_now().await; - StoreReadGuard { - guard, - _not_send: PhantomData, - } - } - - async fn store_mut<'a>(self: &'a Arc) -> StoreWriteGuard<'a> { + pub(crate) async fn store<'a>(&'a self) -> StoreGuard<'a> { #[cfg(test)] tokio::task::yield_now().await; - let guard = self.store.write().await; + let guard = self.store.lock().await; #[cfg(test)] tokio::task::yield_now().await; - StoreWriteGuard { + StoreGuard { guard, _not_send: PhantomData, } @@ -1650,21 +1665,13 @@ impl Server { pub async fn snapshot<'a>(self: &'a Arc) -> ServerSnapshot<'a> { ServerSnapshot { - store: self.store.read().await, + store: self.store().await, peer: &self.peer, } } } -impl<'a> Deref for StoreReadGuard<'a> { - type Target = Store; - - fn deref(&self) -> &Self::Target { - &*self.guard - } -} - -impl<'a> Deref for StoreWriteGuard<'a> { +impl<'a> Deref for StoreGuard<'a> { type Target = Store; fn deref(&self) -> &Self::Target { @@ -1672,13 +1679,13 @@ impl<'a> Deref for StoreWriteGuard<'a> { } } -impl<'a> DerefMut for StoreWriteGuard<'a> { +impl<'a> DerefMut for StoreGuard<'a> { fn deref_mut(&mut self) -> &mut Self::Target { &mut *self.guard } } -impl<'a> Drop for StoreWriteGuard<'a> { +impl<'a> Drop for StoreGuard<'a> { fn drop(&mut self) { #[cfg(test)] self.check_invariants(); diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index d1eb4a3be6d50a20c993e7b9cadacfd5de7e459d..1d36e971e2db1d9160c12c7c3cb538df0e900814 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -62,6 +62,7 @@ pub struct Worktree { #[serde(skip)] pub diagnostic_summaries: BTreeMap, pub scan_id: u64, + pub is_complete: bool, } #[derive(Default)] @@ -615,6 +616,7 @@ impl Store { removed_entries: &[u64], updated_entries: &[proto::Entry], scan_id: u64, + is_last_update: bool, ) -> Result<(Vec, bool, HashMap)> { let project = self.write_project(project_id, connection_id)?; let connection_ids = project.connection_ids(); @@ -657,6 +659,7 @@ impl Store { } worktree.scan_id = scan_id; + worktree.is_complete = is_last_update; Ok(( connection_ids, metadata_changed, diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 995b7bdb3e91ec2a486de8107f3378c1ae0b5d08..d07bcdb9640b891a0ee335a709bc33cb1d49b008 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -507,10 +507,9 @@ impl Project { let mut worktrees = Vec::new(); for worktree in response.worktrees { - let (worktree, load_task) = cx + let worktree = cx .update(|cx| Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx)); worktrees.push(worktree); - load_task.detach(); } let (opened_buffer_tx, opened_buffer_rx) = watch::channel(); @@ -1102,7 +1101,7 @@ impl Project { .ok_or_else(|| anyhow!("missing entry in response"))?; worktree .update(&mut cx, |worktree, cx| { - worktree.as_remote().unwrap().insert_entry( + worktree.as_remote_mut().unwrap().insert_entry( entry, response.worktree_scan_id as usize, cx, @@ -1145,7 +1144,7 @@ impl Project { .ok_or_else(|| anyhow!("missing entry in response"))?; worktree .update(&mut cx, |worktree, cx| { - worktree.as_remote().unwrap().insert_entry( + worktree.as_remote_mut().unwrap().insert_entry( entry, response.worktree_scan_id as usize, cx, @@ -1188,7 +1187,7 @@ impl Project { .ok_or_else(|| anyhow!("missing entry in response"))?; worktree .update(&mut cx, |worktree, cx| { - worktree.as_remote().unwrap().insert_entry( + worktree.as_remote_mut().unwrap().insert_entry( entry, response.worktree_scan_id as usize, cx, @@ -1221,7 +1220,7 @@ impl Project { .await?; worktree .update(&mut cx, move |worktree, cx| { - worktree.as_remote().unwrap().delete_entry( + worktree.as_remote_mut().unwrap().delete_entry( entry_id, response.worktree_scan_id as usize, cx, @@ -1352,12 +1351,13 @@ impl Project { let client = self.client.clone(); cx.foreground() .spawn(async move { - share.await?; client.send(proto::RespondToJoinProjectRequest { requester_id, project_id, allow, - }) + })?; + share.await?; + anyhow::Ok(()) }) .detach_and_log_err(cx); } @@ -4552,18 +4552,9 @@ impl Project { { this.worktrees.push(WorktreeHandle::Strong(old_worktree)); } else { - let worktree = proto::Worktree { - id: worktree.id, - root_name: worktree.root_name, - entries: Default::default(), - diagnostic_summaries: Default::default(), - visible: worktree.visible, - scan_id: 0, - }; - let (worktree, load_task) = + let worktree = Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx); this.add_worktree(&worktree, cx); - load_task.detach(); } } @@ -4587,8 +4578,8 @@ impl Project { if let Some(worktree) = this.worktree_for_id(worktree_id, cx) { worktree.update(cx, |worktree, _| { let worktree = worktree.as_remote_mut().unwrap(); - worktree.update_from_remote(envelope) - })?; + worktree.update_from_remote(envelope.payload); + }); } Ok(()) }) @@ -8125,7 +8116,10 @@ mod tests { } #[gpui::test(retries = 5)] - async fn test_rescan_and_remote_updates(cx: &mut gpui::TestAppContext) { + async fn test_rescan_and_remote_updates( + deterministic: Arc, + cx: &mut gpui::TestAppContext, + ) { let dir = temp_tree(json!({ "a": { "file1": "", @@ -8169,17 +8163,24 @@ mod tests { // Create a remote copy of this worktree. let tree = project.read_with(cx, |project, cx| project.worktrees(cx).next().unwrap()); let initial_snapshot = tree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot()); - let (remote, load_task) = cx.update(|cx| { + let remote = cx.update(|cx| { Worktree::remote( 1, 1, - initial_snapshot.to_proto(&Default::default(), true), + proto::WorktreeMetadata { + id: initial_snapshot.id().to_proto(), + root_name: initial_snapshot.root_name().into(), + visible: true, + }, rpc.clone(), cx, ) }); - // tree - load_task.await; + remote.update(cx, |remote, _| { + let update = initial_snapshot.build_initial_update(1); + remote.as_remote_mut().unwrap().update_from_remote(update); + }); + deterministic.run_until_parked(); cx.read(|cx| { assert!(!buffer2.read(cx).is_dirty()); @@ -8245,19 +8246,16 @@ mod tests { // Update the remote worktree. Check that it becomes consistent with the // local worktree. remote.update(cx, |remote, cx| { - let update_message = tree.read(cx).as_local().unwrap().snapshot().build_update( + let update = tree.read(cx).as_local().unwrap().snapshot().build_update( &initial_snapshot, 1, 1, true, ); - remote - .as_remote_mut() - .unwrap() - .snapshot - .apply_remote_update(update_message) - .unwrap(); - + remote.as_remote_mut().unwrap().update_from_remote(update); + }); + deterministic.run_until_parked(); + remote.read_with(cx, |remote, _| { assert_eq!( remote .paths() diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 472dacc0ea59e77151b3cf50b16f8c1f58569fee..435cf59057827d9c63c197eeb1086257aef4135e 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -7,9 +7,9 @@ use super::{ }; use ::ignore::gitignore::{Gitignore, GitignoreBuilder}; use anyhow::{anyhow, Context, Result}; -use client::{proto, Client, TypedEnvelope}; +use client::{proto, Client}; use clock::ReplicaId; -use collections::HashMap; +use collections::{HashMap, VecDeque}; use futures::{ channel::{ mpsc::{self, UnboundedSender}, @@ -40,11 +40,11 @@ use std::{ ffi::{OsStr, OsString}, fmt, future::Future, - mem, ops::{Deref, DerefMut}, os::unix::prelude::{OsStrExt, OsStringExt}, path::{Path, PathBuf}, sync::{atomic::AtomicUsize, Arc}, + task::Poll, time::{Duration, SystemTime}, }; use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap}; @@ -82,7 +82,7 @@ pub struct RemoteWorktree { project_id: u64, client: Arc, updates_tx: Option>, - last_scan_id_rx: watch::Receiver, + snapshot_subscriptions: VecDeque<(usize, oneshot::Sender<()>)>, replica_id: ReplicaId, diagnostic_summaries: TreeMap, visible: bool, @@ -96,6 +96,7 @@ pub struct Snapshot { entries_by_path: SumTree, entries_by_id: SumTree, scan_id: usize, + is_complete: bool, } #[derive(Clone)] @@ -124,13 +125,16 @@ impl DerefMut for LocalSnapshot { #[derive(Clone, Debug)] enum ScanState { Idle, - Scanning, + /// The worktree is performing its initial scan of the filesystem. + Initializing, + /// The worktree is updating in response to filesystem events. + Updating, Err(Arc), } struct ShareState { project_id: u64, - snapshots_tx: Sender, + snapshots_tx: watch::Sender, _maintain_remote_snapshot: Option>>, } @@ -171,10 +175,10 @@ impl Worktree { pub fn remote( project_remote_id: u64, replica_id: ReplicaId, - worktree: proto::Worktree, + worktree: proto::WorktreeMetadata, client: Arc, cx: &mut MutableAppContext, - ) -> (ModelHandle, Task<()>) { + ) -> ModelHandle { let remote_id = worktree.id; let root_char_bag: CharBag = worktree .root_name @@ -189,13 +193,13 @@ impl Worktree { root_char_bag, entries_by_path: Default::default(), entries_by_id: Default::default(), - scan_id: worktree.scan_id as usize, + scan_id: 0, + is_complete: false, }; let (updates_tx, mut updates_rx) = mpsc::unbounded(); let background_snapshot = Arc::new(Mutex::new(snapshot.clone())); let (mut snapshot_updated_tx, mut snapshot_updated_rx) = watch::channel(); - let (mut last_scan_id_tx, last_scan_id_rx) = watch::channel_with(worktree.scan_id as usize); let worktree_handle = cx.add_model(|_: &mut ModelContext| { Worktree::Remote(RemoteWorktree { project_id: project_remote_id, @@ -203,96 +207,50 @@ impl Worktree { snapshot: snapshot.clone(), background_snapshot: background_snapshot.clone(), updates_tx: Some(updates_tx), - last_scan_id_rx, + snapshot_subscriptions: Default::default(), client: client.clone(), - diagnostic_summaries: TreeMap::from_ordered_entries( - worktree.diagnostic_summaries.into_iter().map(|summary| { - ( - PathKey(PathBuf::from(summary.path).into()), - DiagnosticSummary { - language_server_id: summary.language_server_id as usize, - error_count: summary.error_count as usize, - warning_count: summary.warning_count as usize, - }, - ) - }), - ), + diagnostic_summaries: Default::default(), visible, }) }); - let deserialize_task = cx.spawn({ - let worktree_handle = worktree_handle.clone(); - |cx| async move { - let (entries_by_path, entries_by_id) = cx - .background() - .spawn(async move { - let mut entries_by_path_edits = Vec::new(); - let mut entries_by_id_edits = Vec::new(); - for entry in worktree.entries { - match Entry::try_from((&root_char_bag, entry)) { - Ok(entry) => { - entries_by_id_edits.push(Edit::Insert(PathEntry { - id: entry.id, - path: entry.path.clone(), - is_ignored: entry.is_ignored, - scan_id: 0, - })); - entries_by_path_edits.push(Edit::Insert(entry)); - } - Err(err) => log::warn!("error for remote worktree entry {:?}", err), - } - } - - let mut entries_by_path = SumTree::new(); - let mut entries_by_id = SumTree::new(); - entries_by_path.edit(entries_by_path_edits, &()); - entries_by_id.edit(entries_by_id_edits, &()); - - (entries_by_path, entries_by_id) - }) - .await; - - { - let mut snapshot = background_snapshot.lock(); - snapshot.entries_by_path = entries_by_path; - snapshot.entries_by_id = entries_by_id; + cx.background() + .spawn(async move { + while let Some(update) = updates_rx.next().await { + if let Err(error) = background_snapshot.lock().apply_remote_update(update) { + log::error!("error applying worktree update: {}", error); + } snapshot_updated_tx.send(()).await.ok(); } + }) + .detach(); - cx.background() - .spawn(async move { - while let Some(update) = updates_rx.next().await { - if let Err(error) = - background_snapshot.lock().apply_remote_update(update) - { - log::error!("error applying worktree update: {}", error); - } - snapshot_updated_tx.send(()).await.ok(); - } - }) - .detach(); - - cx.spawn(|mut cx| { - let this = worktree_handle.downgrade(); - async move { - while let Some(_) = snapshot_updated_rx.recv().await { - if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |this, cx| { - this.poll_snapshot(cx); - let this = this.as_remote_mut().unwrap(); - *last_scan_id_tx.borrow_mut() = this.snapshot.scan_id; - }); - } else { - break; + cx.spawn(|mut cx| { + let this = worktree_handle.downgrade(); + async move { + while let Some(_) = snapshot_updated_rx.recv().await { + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |this, cx| { + this.poll_snapshot(cx); + let this = this.as_remote_mut().unwrap(); + while let Some((scan_id, _)) = this.snapshot_subscriptions.front() { + if this.observed_snapshot(*scan_id) { + let (_, tx) = this.snapshot_subscriptions.pop_front().unwrap(); + let _ = tx.send(()); + } else { + break; + } } - } + }); + } else { + break; } - }) - .detach(); + } } - }); - (worktree_handle, deserialize_task) + }) + .detach(); + + worktree_handle } pub fn as_local(&self) -> Option<&LocalWorktree> { @@ -376,38 +334,9 @@ impl Worktree { fn poll_snapshot(&mut self, cx: &mut ModelContext) { match self { - Self::Local(worktree) => { - let is_fake_fs = worktree.fs.is_fake(); - worktree.snapshot = worktree.background_snapshot.lock().clone(); - if worktree.is_scanning() { - if worktree.poll_task.is_none() { - worktree.poll_task = Some(cx.spawn_weak(|this, mut cx| async move { - if is_fake_fs { - #[cfg(any(test, feature = "test-support"))] - cx.background().simulate_random_delay().await; - } else { - smol::Timer::after(Duration::from_millis(100)).await; - } - if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |this, cx| { - this.as_local_mut().unwrap().poll_task = None; - this.poll_snapshot(cx); - }); - } - })); - } - } else { - worktree.poll_task.take(); - cx.emit(Event::UpdatedEntries); - } - } - Self::Remote(worktree) => { - worktree.snapshot = worktree.background_snapshot.lock().clone(); - cx.emit(Event::UpdatedEntries); - } + Self::Local(worktree) => worktree.poll_snapshot(false, cx), + Self::Remote(worktree) => worktree.poll_snapshot(cx), }; - - cx.notify(); } } @@ -435,7 +364,8 @@ impl LocalWorktree { .context("failed to stat worktree path")?; let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded(); - let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning); + let (mut last_scan_state_tx, last_scan_state_rx) = + watch::channel_with(ScanState::Initializing); let tree = cx.add_model(move |cx: &mut ModelContext| { let mut snapshot = LocalSnapshot { abs_path, @@ -449,6 +379,7 @@ impl LocalWorktree { entries_by_path: Default::default(), entries_by_id: Default::default(), scan_id: 0, + is_complete: true, }, }; if let Some(metadata) = metadata { @@ -479,11 +410,7 @@ impl LocalWorktree { while let Some(scan_state) = scan_states_rx.next().await { if let Some(this) = this.upgrade(&cx) { last_scan_state_tx.blocking_send(scan_state).ok(); - this.update(&mut cx, |this, cx| { - this.poll_snapshot(cx); - this.as_local().unwrap().broadcast_snapshot() - }) - .await; + this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); } else { break; } @@ -567,22 +494,53 @@ impl LocalWorktree { Ok(updated) } + fn poll_snapshot(&mut self, force: bool, cx: &mut ModelContext) { + self.poll_task.take(); + match self.scan_state() { + ScanState::Idle => { + self.snapshot = self.background_snapshot.lock().clone(); + if let Some(share) = self.share.as_mut() { + *share.snapshots_tx.borrow_mut() = self.snapshot.clone(); + } + cx.emit(Event::UpdatedEntries); + } + ScanState::Initializing => { + let is_fake_fs = self.fs.is_fake(); + self.snapshot = self.background_snapshot.lock().clone(); + self.poll_task = Some(cx.spawn_weak(|this, mut cx| async move { + if is_fake_fs { + #[cfg(any(test, feature = "test-support"))] + cx.background().simulate_random_delay().await; + } else { + smol::Timer::after(Duration::from_millis(100)).await; + } + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); + } + })); + cx.emit(Event::UpdatedEntries); + } + _ => { + if force { + self.snapshot = self.background_snapshot.lock().clone(); + } + } + } + cx.notify(); + } + pub fn scan_complete(&self) -> impl Future { let mut scan_state_rx = self.last_scan_state_rx.clone(); async move { let mut scan_state = Some(scan_state_rx.borrow().clone()); - while let Some(ScanState::Scanning) = scan_state { + while let Some(ScanState::Initializing | ScanState::Updating) = scan_state { scan_state = scan_state_rx.recv().await; } } } - fn is_scanning(&self) -> bool { - if let ScanState::Scanning = *self.last_scan_state_rx.borrow() { - true - } else { - false - } + fn scan_state(&self) -> ScanState { + self.last_scan_state_rx.borrow().clone() } pub fn snapshot(&self) -> LocalSnapshot { @@ -612,7 +570,6 @@ impl LocalWorktree { .refresh_entry(path, abs_path, None, cx) }) .await?; - this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); Ok(( File { entry_id: Some(entry.id), @@ -710,16 +667,14 @@ impl LocalWorktree { Some(cx.spawn(|this, mut cx| async move { delete.await?; - this.update(&mut cx, |this, _| { + this.update(&mut cx, |this, cx| { let this = this.as_local_mut().unwrap(); - let mut snapshot = this.background_snapshot.lock(); - snapshot.delete_entry(entry_id); + { + let mut snapshot = this.background_snapshot.lock(); + snapshot.delete_entry(entry_id); + } + this.poll_snapshot(true, cx); }); - this.update(&mut cx, |this, cx| { - this.poll_snapshot(cx); - this.as_local().unwrap().broadcast_snapshot() - }) - .await; Ok(()) })) } @@ -755,11 +710,6 @@ impl LocalWorktree { ) }) .await?; - this.update(&mut cx, |this, cx| { - this.poll_snapshot(cx); - this.as_local().unwrap().broadcast_snapshot() - }) - .await; Ok(entry) })) } @@ -795,11 +745,6 @@ impl LocalWorktree { ) }) .await?; - this.update(&mut cx, |this, cx| { - this.poll_snapshot(cx); - this.as_local().unwrap().broadcast_snapshot() - }) - .await; Ok(entry) })) } @@ -833,11 +778,6 @@ impl LocalWorktree { .refresh_entry(path, abs_path, None, cx) }) .await?; - this.update(&mut cx, |this, cx| { - this.poll_snapshot(cx); - this.as_local().unwrap().broadcast_snapshot() - }) - .await; Ok(entry) }) } @@ -870,61 +810,55 @@ impl LocalWorktree { let this = this .upgrade(&cx) .ok_or_else(|| anyhow!("worktree was dropped"))?; - let (entry, snapshot, snapshots_tx) = this.read_with(&cx, |this, _| { - let this = this.as_local().unwrap(); - let mut snapshot = this.background_snapshot.lock(); - entry.is_ignored = snapshot - .ignore_stack_for_path(&path, entry.is_dir()) - .is_path_ignored(&path, entry.is_dir()); - if let Some(old_path) = old_path { - snapshot.remove_path(&old_path); + this.update(&mut cx, |this, cx| { + let this = this.as_local_mut().unwrap(); + let inserted_entry; + { + let mut snapshot = this.background_snapshot.lock(); + entry.is_ignored = snapshot + .ignore_stack_for_path(&path, entry.is_dir()) + .is_path_ignored(&path, entry.is_dir()); + if let Some(old_path) = old_path { + snapshot.remove_path(&old_path); + } + inserted_entry = snapshot.insert_entry(entry, fs.as_ref()); + snapshot.scan_id += 1; } - let entry = snapshot.insert_entry(entry, fs.as_ref()); - snapshot.scan_id += 1; - let snapshots_tx = this.share.as_ref().map(|s| s.snapshots_tx.clone()); - (entry, snapshot.clone(), snapshots_tx) - }); - this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); - - if let Some(snapshots_tx) = snapshots_tx { - snapshots_tx.send(snapshot).await.ok(); - } - - Ok(entry) + this.poll_snapshot(true, cx); + Ok(inserted_entry) + }) }) } pub fn share(&mut self, project_id: u64, cx: &mut ModelContext) -> Task> { let (share_tx, share_rx) = oneshot::channel(); - let (snapshots_to_send_tx, snapshots_to_send_rx) = - smol::channel::unbounded::(); + if self.share.is_some() { let _ = share_tx.send(Ok(())); } else { + let (snapshots_tx, mut snapshots_rx) = watch::channel_with(self.snapshot()); let rpc = self.client.clone(); let worktree_id = cx.model_id() as u64; let maintain_remote_snapshot = cx.background().spawn({ let rpc = rpc.clone(); let diagnostic_summaries = self.diagnostic_summaries.clone(); async move { - let mut prev_snapshot = match snapshots_to_send_rx.recv().await { - Ok(snapshot) => { - if let Err(error) = rpc - .request(proto::UpdateWorktree { - project_id, - worktree_id, - root_name: snapshot.root_name().to_string(), - updated_entries: snapshot - .entries_by_path - .iter() - .filter(|e| !e.is_ignored) - .map(Into::into) - .collect(), - removed_entries: Default::default(), - scan_id: snapshot.scan_id as u64, - }) - .await - { + let mut prev_snapshot = match snapshots_rx.recv().await { + Some(snapshot) => { + let update = proto::UpdateWorktree { + project_id, + worktree_id, + root_name: snapshot.root_name().to_string(), + updated_entries: snapshot + .entries_by_path + .iter() + .map(Into::into) + .collect(), + removed_entries: Default::default(), + scan_id: snapshot.scan_id as u64, + is_last_update: true, + }; + if let Err(error) = send_worktree_update(&rpc, update).await { let _ = share_tx.send(Err(error)); return Err(anyhow!("failed to send initial update worktree")); } else { @@ -932,8 +866,10 @@ impl LocalWorktree { snapshot } } - Err(error) => { - let _ = share_tx.send(Err(error.into())); + None => { + share_tx + .send(Err(anyhow!("worktree dropped before share completed"))) + .ok(); return Err(anyhow!("failed to send initial update worktree")); } }; @@ -946,44 +882,12 @@ impl LocalWorktree { })?; } - // Stream ignored entries in chunks. - { - let mut ignored_entries = prev_snapshot - .entries_by_path - .iter() - .filter(|e| e.is_ignored); - let mut ignored_entries_to_send = Vec::new(); - loop { - #[cfg(any(test, feature = "test-support"))] - const CHUNK_SIZE: usize = 2; - #[cfg(not(any(test, feature = "test-support")))] - const CHUNK_SIZE: usize = 256; - - let entry = ignored_entries.next(); - if ignored_entries_to_send.len() >= CHUNK_SIZE || entry.is_none() { - rpc.request(proto::UpdateWorktree { - project_id, - worktree_id, - root_name: prev_snapshot.root_name().to_string(), - updated_entries: mem::take(&mut ignored_entries_to_send), - removed_entries: Default::default(), - scan_id: prev_snapshot.scan_id as u64, - }) - .await?; - } - - if let Some(entry) = entry { - ignored_entries_to_send.push(entry.into()); - } else { - break; - } - } - } - - while let Ok(snapshot) = snapshots_to_send_rx.recv().await { - let message = - snapshot.build_update(&prev_snapshot, project_id, worktree_id, true); - rpc.request(message).await?; + while let Some(snapshot) = snapshots_rx.recv().await { + send_worktree_update( + &rpc, + snapshot.build_update(&prev_snapshot, project_id, worktree_id, true), + ) + .await?; prev_snapshot = snapshot; } @@ -993,18 +897,12 @@ impl LocalWorktree { }); self.share = Some(ShareState { project_id, - snapshots_tx: snapshots_to_send_tx.clone(), + snapshots_tx, _maintain_remote_snapshot: Some(maintain_remote_snapshot), }); } - cx.spawn_weak(|this, cx| async move { - if let Some(this) = this.upgrade(&cx) { - this.read_with(&cx, |this, _| { - let this = this.as_local().unwrap(); - let _ = snapshots_to_send_tx.try_send(this.snapshot()); - }); - } + cx.foreground().spawn(async move { share_rx .await .unwrap_or_else(|_| Err(anyhow!("share ended"))) @@ -1018,23 +916,6 @@ impl LocalWorktree { pub fn is_shared(&self) -> bool { self.share.is_some() } - - fn broadcast_snapshot(&self) -> impl Future { - let mut to_send = None; - if !self.is_scanning() { - if let Some(share) = self.share.as_ref() { - to_send = Some((self.snapshot(), share.snapshots_tx.clone())); - } - } - - async move { - if let Some((snapshot, snapshots_to_send_tx)) = to_send { - if let Err(err) = snapshots_to_send_tx.send(snapshot).await { - log::error!("error submitting snapshot to send {}", err); - } - } - } - } } impl RemoteWorktree { @@ -1042,31 +923,45 @@ impl RemoteWorktree { self.snapshot.clone() } + fn poll_snapshot(&mut self, cx: &mut ModelContext) { + self.snapshot = self.background_snapshot.lock().clone(); + cx.emit(Event::UpdatedEntries); + cx.notify(); + } + pub fn disconnected_from_host(&mut self) { self.updates_tx.take(); + self.snapshot_subscriptions.clear(); } - pub fn update_from_remote( - &mut self, - envelope: TypedEnvelope, - ) -> Result<()> { + pub fn update_from_remote(&mut self, update: proto::UpdateWorktree) { if let Some(updates_tx) = &self.updates_tx { updates_tx - .unbounded_send(envelope.payload) + .unbounded_send(update) .expect("consumer runs to completion"); } - Ok(()) } - fn wait_for_snapshot(&self, scan_id: usize) -> impl Future { - let mut rx = self.last_scan_id_rx.clone(); - async move { - while let Some(applied_scan_id) = rx.next().await { - if applied_scan_id >= scan_id { - return; - } + fn observed_snapshot(&self, scan_id: usize) -> bool { + self.scan_id > scan_id || (self.scan_id == scan_id && self.is_complete) + } + + fn wait_for_snapshot(&mut self, scan_id: usize) -> impl Future { + let (tx, rx) = oneshot::channel(); + if self.observed_snapshot(scan_id) { + let _ = tx.send(()); + } else { + match self + .snapshot_subscriptions + .binary_search_by_key(&scan_id, |probe| probe.0) + { + Ok(ix) | Err(ix) => self.snapshot_subscriptions.insert(ix, (scan_id, tx)), } } + + async move { + let _ = rx.await; + } } pub fn update_diagnostic_summary( @@ -1088,7 +983,7 @@ impl RemoteWorktree { } pub fn insert_entry( - &self, + &mut self, entry: proto::Entry, scan_id: usize, cx: &mut ModelContext, @@ -1107,7 +1002,7 @@ impl RemoteWorktree { } pub(crate) fn delete_entry( - &self, + &mut self, id: ProjectEntryId, scan_id: usize, cx: &mut ModelContext, @@ -1183,7 +1078,7 @@ impl Snapshot { for entry_id in update.removed_entries { let entry = self .entry_for_id(ProjectEntryId::from_proto(entry_id)) - .ok_or_else(|| anyhow!("unknown entry"))?; + .ok_or_else(|| anyhow!("unknown entry {}", entry_id))?; entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone()))); entries_by_id_edits.push(Edit::Remove(entry.id)); } @@ -1205,6 +1100,7 @@ impl Snapshot { self.entries_by_path.edit(entries_by_path_edits, &()); self.entries_by_id.edit(entries_by_id_edits, &()); self.scan_id = update.scan_id as usize; + self.is_complete = update.is_last_update; Ok(()) } @@ -1326,27 +1222,16 @@ impl LocalSnapshot { } #[cfg(test)] - pub(crate) fn to_proto( - &self, - diagnostic_summaries: &TreeMap, - visible: bool, - ) -> proto::Worktree { + pub(crate) fn build_initial_update(&self, project_id: u64) -> proto::UpdateWorktree { let root_name = self.root_name.clone(); - proto::Worktree { - id: self.id.0 as u64, + proto::UpdateWorktree { + project_id, + worktree_id: self.id().to_proto(), root_name, - entries: self - .entries_by_path - .iter() - .filter(|e| !e.is_ignored) - .map(Into::into) - .collect(), - diagnostic_summaries: diagnostic_summaries - .iter() - .map(|(path, summary)| summary.to_proto(&path.0)) - .collect(), - visible, + updated_entries: self.entries_by_path.iter().map(Into::into).collect(), + removed_entries: Default::default(), scan_id: self.scan_id as u64, + is_last_update: true, } } @@ -1413,6 +1298,7 @@ impl LocalSnapshot { updated_entries, removed_entries, scan_id: self.scan_id as u64, + is_last_update: true, } } @@ -2050,7 +1936,7 @@ impl BackgroundScanner { } async fn run(mut self, events_rx: impl Stream>) { - if self.notify.unbounded_send(ScanState::Scanning).is_err() { + if self.notify.unbounded_send(ScanState::Initializing).is_err() { return; } @@ -2069,8 +1955,13 @@ impl BackgroundScanner { } futures::pin_mut!(events_rx); - while let Some(events) = events_rx.next().await { - if self.notify.unbounded_send(ScanState::Scanning).is_err() { + + while let Some(mut events) = events_rx.next().await { + while let Poll::Ready(Some(additional_events)) = futures::poll!(events_rx.next()) { + events.extend(additional_events); + } + + if self.notify.unbounded_send(ScanState::Updating).is_err() { break; } @@ -2722,6 +2613,19 @@ impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry { } } +async fn send_worktree_update(client: &Arc, update: proto::UpdateWorktree) -> Result<()> { + #[cfg(any(test, feature = "test-support"))] + const MAX_CHUNK_SIZE: usize = 2; + #[cfg(not(any(test, feature = "test-support")))] + const MAX_CHUNK_SIZE: usize = 256; + + for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) { + client.request(update).await?; + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; @@ -2931,6 +2835,7 @@ mod tests { root_name: Default::default(), root_char_bag: Default::default(), scan_id: 0, + is_complete: true, }, }; initial_snapshot.insert_entry( diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 69ccae1704ebb05b87dad60a5bd32c9f00432fa3..e3ca60c2519980d8771d5205517962c904ebbc72 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -168,7 +168,7 @@ message JoinProjectResponse { message Accept { uint32 replica_id = 1; - repeated Worktree worktrees = 2; + repeated WorktreeMetadata worktrees = 2; repeated Collaborator collaborators = 3; repeated LanguageServer language_servers = 4; } @@ -195,6 +195,7 @@ message UpdateWorktree { repeated Entry updated_entries = 4; repeated uint64 removed_entries = 5; uint64 scan_id = 6; + bool is_last_update = 7; } message CreateProjectEntry { @@ -765,15 +766,6 @@ message User { string avatar_url = 3; } -message Worktree { - uint64 id = 1; - string root_name = 2; - repeated Entry entries = 3; - repeated DiagnosticSummary diagnostic_summaries = 4; - bool visible = 5; - uint64 scan_id = 6; -} - message File { uint64 worktree_id = 1; optional uint64 entry_id = 2; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index ecee3709863b4239dcd43b3c2e9c21eb4868880c..6200f37cd2da253cb511241c9eed32a2819b50e6 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -5,6 +5,7 @@ use futures::{SinkExt as _, StreamExt as _}; use prost::Message as _; use serde::Serialize; use std::any::{Any, TypeId}; +use std::{cmp, iter, mem}; use std::{ fmt::Debug, io, @@ -390,6 +391,31 @@ impl From for u128 { } } +pub fn split_worktree_update( + mut message: UpdateWorktree, + max_chunk_size: usize, +) -> impl Iterator { + let mut done = false; + iter::from_fn(move || { + if done { + return None; + } + + let chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size); + let updated_entries = message.updated_entries.drain(..chunk_size).collect(); + done = message.updated_entries.is_empty(); + Some(UpdateWorktree { + project_id: message.project_id, + worktree_id: message.worktree_id, + root_name: message.root_name.clone(), + updated_entries, + removed_entries: mem::take(&mut message.removed_entries), + scan_id: message.scan_id, + is_last_update: done && message.is_last_update, + }) + }) +} + #[cfg(test)] mod tests { use super::*;