Merge pull request #1265 from zed-industries/worktree-performance

Antonio Scandurra created

Fix problems that arise when large numbers of files change on disk

Change summary

crates/client/src/client.rs            |  17 
crates/collab/src/db.rs                |  11 
crates/collab/src/integration_tests.rs |  39 +-
crates/collab/src/rpc.rs               | 169 ++++----
crates/collab/src/rpc/store.rs         |   3 
crates/project/src/project.rs          |  64 +-
crates/project/src/worktree.rs         | 503 +++++++++++----------------
crates/rpc/proto/zed.proto             |  12 
crates/rpc/src/proto.rs                |  26 +
9 files changed, 392 insertions(+), 452 deletions(-)

Detailed changes

crates/client/src/client.rs 🔗

@@ -28,10 +28,7 @@ use std::{
     convert::TryFrom,
     fmt::Write as _,
     future::Future,
-    sync::{
-        atomic::{AtomicUsize, Ordering},
-        Arc, Weak,
-    },
+    sync::{Arc, Weak},
     time::{Duration, Instant},
 };
 use thiserror::Error;
@@ -232,12 +229,8 @@ impl Drop for Subscription {
 
 impl Client {
     pub fn new(http: Arc<dyn HttpClient>) -> Arc<Self> {
-        lazy_static! {
-            static ref NEXT_CLIENT_ID: AtomicUsize = AtomicUsize::default();
-        }
-
         Arc::new(Self {
-            id: NEXT_CLIENT_ID.fetch_add(1, Ordering::SeqCst),
+            id: 0,
             peer: Peer::new(),
             http,
             state: Default::default(),
@@ -257,6 +250,12 @@ impl Client {
         self.http.clone()
     }
 
+    #[cfg(any(test, feature = "test-support"))]
+    pub fn set_id(&mut self, id: usize) -> &Self {
+        self.id = id;
+        self
+    }
+
     #[cfg(any(test, feature = "test-support"))]
     pub fn tear_down(&self) {
         let mut state = self.state.write();

crates/collab/src/db.rs 🔗

@@ -2117,7 +2117,7 @@ pub mod tests {
             Self {
                 background,
                 users: Default::default(),
-                next_user_id: Mutex::new(1),
+                next_user_id: Mutex::new(0),
                 projects: Default::default(),
                 worktree_extensions: Default::default(),
                 next_project_id: Mutex::new(1),
@@ -2181,6 +2181,7 @@ pub mod tests {
         }
 
         async fn get_user_by_id(&self, id: UserId) -> Result<Option<User>> {
+            self.background.simulate_random_delay().await;
             Ok(self.get_users_by_ids(vec![id]).await?.into_iter().next())
         }
 
@@ -2195,6 +2196,7 @@ pub mod tests {
         }
 
         async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
+            self.background.simulate_random_delay().await;
             Ok(self
                 .users
                 .lock()
@@ -2228,6 +2230,7 @@ pub mod tests {
         }
 
         async fn get_invite_code_for_user(&self, _id: UserId) -> Result<Option<(String, u32)>> {
+            self.background.simulate_random_delay().await;
             Ok(None)
         }
 
@@ -2265,6 +2268,7 @@ pub mod tests {
         }
 
         async fn unregister_project(&self, project_id: ProjectId) -> Result<()> {
+            self.background.simulate_random_delay().await;
             self.projects
                 .lock()
                 .get_mut(&project_id)
@@ -2370,6 +2374,7 @@ pub mod tests {
             requester_id: UserId,
             responder_id: UserId,
         ) -> Result<()> {
+            self.background.simulate_random_delay().await;
             let mut contacts = self.contacts.lock();
             for contact in contacts.iter_mut() {
                 if contact.requester_id == requester_id && contact.responder_id == responder_id {
@@ -2399,6 +2404,7 @@ pub mod tests {
         }
 
         async fn remove_contact(&self, requester_id: UserId, responder_id: UserId) -> Result<()> {
+            self.background.simulate_random_delay().await;
             self.contacts.lock().retain(|contact| {
                 !(contact.requester_id == requester_id && contact.responder_id == responder_id)
             });
@@ -2410,6 +2416,7 @@ pub mod tests {
             user_id: UserId,
             contact_user_id: UserId,
         ) -> Result<()> {
+            self.background.simulate_random_delay().await;
             let mut contacts = self.contacts.lock();
             for contact in contacts.iter_mut() {
                 if contact.requester_id == contact_user_id
@@ -2436,6 +2443,7 @@ pub mod tests {
             requester_id: UserId,
             accept: bool,
         ) -> Result<()> {
+            self.background.simulate_random_delay().await;
             let mut contacts = self.contacts.lock();
             for (ix, contact) in contacts.iter_mut().enumerate() {
                 if contact.requester_id == requester_id && contact.responder_id == responder_id {
@@ -2631,6 +2639,7 @@ pub mod tests {
             count: usize,
             before_id: Option<MessageId>,
         ) -> Result<Vec<ChannelMessage>> {
+            self.background.simulate_random_delay().await;
             let mut messages = self
                 .channel_messages
                 .lock()

crates/collab/src/integration_tests.rs 🔗

@@ -50,7 +50,6 @@ use std::{
     time::Duration,
 };
 use theme::ThemeRegistry;
-use tokio::sync::RwLockReadGuard;
 use workspace::{Item, SplitDirection, ToggleFollow, Workspace};
 
 #[ctor::ctor]
@@ -589,7 +588,7 @@ async fn test_offline_projects(
     deterministic.run_until_parked();
     assert!(server
         .store
-        .read()
+        .lock()
         .await
         .project_metadata_for_user(user_a)
         .is_empty());
@@ -620,7 +619,7 @@ async fn test_offline_projects(
     cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
     assert!(server
         .store
-        .read()
+        .lock()
         .await
         .project_metadata_for_user(user_a)
         .is_empty());
@@ -1446,7 +1445,7 @@ async fn test_collaborating_with_diagnostics(
     // Wait for server to see the diagnostics update.
     deterministic.run_until_parked();
     {
-        let store = server.store.read().await;
+        let store = server.store.lock().await;
         let project = store.project(ProjectId::from_proto(project_id)).unwrap();
         let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap();
         assert!(!worktree.diagnostic_summaries.is_empty());
@@ -1472,6 +1471,7 @@ async fn test_collaborating_with_diagnostics(
 
     // Join project as client C and observe the diagnostics.
     let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await;
+    deterministic.run_until_parked();
     project_c.read_with(cx_c, |project, cx| {
         assert_eq!(
             project.diagnostic_summaries(cx).collect::<Vec<_>>(),
@@ -3171,7 +3171,7 @@ async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
 
     assert_eq!(
         server
-            .state()
+            .store()
             .await
             .channel(channel_id)
             .unwrap()
@@ -4425,8 +4425,16 @@ async fn test_random_collaboration(
     let mut server = TestServer::start(cx.foreground(), cx.background()).await;
     let db = server.app_state.db.clone();
     let host_user_id = db.create_user("host", None, false).await.unwrap();
-    for username in ["guest-1", "guest-2", "guest-3", "guest-4"] {
+    let mut available_guests = vec![
+        "guest-1".to_string(),
+        "guest-2".to_string(),
+        "guest-3".to_string(),
+        "guest-4".to_string(),
+    ];
+
+    for username in &available_guests {
         let guest_user_id = db.create_user(username, None, false).await.unwrap();
+        assert_eq!(*username, format!("guest-{}", guest_user_id));
         server
             .app_state
             .db
@@ -4620,12 +4628,7 @@ async fn test_random_collaboration(
     } else {
         max_operations
     };
-    let mut available_guests = vec![
-        "guest-1".to_string(),
-        "guest-2".to_string(),
-        "guest-3".to_string(),
-        "guest-4".to_string(),
-    ];
+
     let mut operations = 0;
     while operations < max_operations {
         if operations == disconnect_host_at {
@@ -4656,7 +4659,7 @@ async fn test_random_collaboration(
                     .unwrap();
                 let contacts = server
                     .store
-                    .read()
+                    .lock()
                     .await
                     .build_initial_contacts_update(contacts)
                     .contacts;
@@ -4728,6 +4731,7 @@ async fn test_random_collaboration(
                 server.disconnect_client(removed_guest_id);
                 deterministic.advance_clock(RECEIVE_TIMEOUT);
                 deterministic.start_waiting();
+                log::info!("Waiting for guest {} to exit...", removed_guest_id);
                 let (guest, guest_project, mut guest_cx, guest_err) = guest.await;
                 deterministic.finish_waiting();
                 server.allow_connections();
@@ -4740,7 +4744,7 @@ async fn test_random_collaboration(
                     let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap();
                     let contacts = server
                         .store
-                        .read()
+                        .lock()
                         .await
                         .build_initial_contacts_update(contacts)
                         .contacts;
@@ -4944,6 +4948,7 @@ impl TestServer {
 
         Arc::get_mut(&mut client)
             .unwrap()
+            .set_id(user_id.0 as usize)
             .override_authenticate(move |cx| {
                 cx.spawn(|_| async move {
                     let access_token = "the-token".to_string();
@@ -5071,10 +5076,6 @@ impl TestServer {
         })
     }
 
-    async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
-        self.server.store.read().await
-    }
-
     async fn condition<F>(&mut self, mut predicate: F)
     where
         F: FnMut(&Store) -> bool,
@@ -5083,7 +5084,7 @@ impl TestServer {
             self.foreground.parking_forbidden(),
             "you must call forbid_parking to use server conditions so we don't block indefinitely"
         );
-        while !(predicate)(&*self.server.store.read().await) {
+        while !(predicate)(&*self.server.store.lock().await) {
             self.foreground.start_waiting();
             self.notifications.next().await;
             self.foreground.finish_waiting();

crates/collab/src/rpc.rs 🔗

@@ -51,7 +51,7 @@ use std::{
 };
 use time::OffsetDateTime;
 use tokio::{
-    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
+    sync::{Mutex, MutexGuard},
     time::Sleep,
 };
 use tower::ServiceBuilder;
@@ -97,7 +97,7 @@ impl<R: RequestMessage> Response<R> {
 
 pub struct Server {
     peer: Arc<Peer>,
-    pub(crate) store: RwLock<Store>,
+    pub(crate) store: Mutex<Store>,
     app_state: Arc<AppState>,
     handlers: HashMap<TypeId, MessageHandler>,
     notifications: Option<mpsc::UnboundedSender<()>>,
@@ -115,13 +115,8 @@ pub struct RealExecutor;
 const MESSAGE_COUNT_PER_PAGE: usize = 100;
 const MAX_MESSAGE_LEN: usize = 1024;
 
-struct StoreReadGuard<'a> {
-    guard: RwLockReadGuard<'a, Store>,
-    _not_send: PhantomData<Rc<()>>,
-}
-
-struct StoreWriteGuard<'a> {
-    guard: RwLockWriteGuard<'a, Store>,
+pub(crate) struct StoreGuard<'a> {
+    guard: MutexGuard<'a, Store>,
     _not_send: PhantomData<Rc<()>>,
 }
 
@@ -129,7 +124,7 @@ struct StoreWriteGuard<'a> {
 pub struct ServerSnapshot<'a> {
     peer: &'a Peer,
     #[serde(serialize_with = "serialize_deref")]
-    store: RwLockReadGuard<'a, Store>,
+    store: StoreGuard<'a>,
 }
 
 pub fn serialize_deref<S, T, U>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
@@ -384,7 +379,7 @@ impl Server {
             ).await?;
 
             {
-                let mut store = this.store_mut().await;
+                let mut store = this.store().await;
                 store.add_connection(connection_id, user_id, user.admin);
                 this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 
@@ -471,7 +466,7 @@ impl Server {
         let mut projects_to_unregister = Vec::new();
         let removed_user_id;
         {
-            let mut store = self.store_mut().await;
+            let mut store = self.store().await;
             let removed_connection = store.remove_connection(connection_id)?;
 
             for (project_id, project) in removed_connection.hosted_projects {
@@ -605,7 +600,7 @@ impl Server {
             .await
             .user_id_for_connection(request.sender_id)?;
         let project_id = self.app_state.db.register_project(user_id).await?;
-        self.store_mut()
+        self.store()
             .await
             .register_project(request.sender_id, project_id)?;
 
@@ -623,7 +618,7 @@ impl Server {
     ) -> Result<()> {
         let project_id = ProjectId::from_proto(request.payload.project_id);
         let (user_id, project) = {
-            let mut state = self.store_mut().await;
+            let mut state = self.store().await;
             let project = state.unregister_project(project_id, request.sender_id)?;
             (state.user_id_for_connection(request.sender_id)?, project)
         };
@@ -725,7 +720,7 @@ impl Server {
             return Err(anyhow!("no such project"))?;
         }
 
-        self.store_mut().await.request_join_project(
+        self.store().await.request_join_project(
             guest_user_id,
             project_id,
             response.into_receipt(),
@@ -747,7 +742,7 @@ impl Server {
         let host_user_id;
 
         {
-            let mut state = self.store_mut().await;
+            let mut state = self.store().await;
             let project_id = ProjectId::from_proto(request.payload.project_id);
             let project = state.project(project_id)?;
             if project.host_connection_id != request.sender_id {
@@ -791,20 +786,10 @@ impl Server {
             let worktrees = project
                 .worktrees
                 .iter()
-                .filter_map(|(id, shared_worktree)| {
-                    let worktree = project.worktrees.get(&id)?;
-                    Some(proto::Worktree {
-                        id: *id,
-                        root_name: worktree.root_name.clone(),
-                        entries: shared_worktree.entries.values().cloned().collect(),
-                        diagnostic_summaries: shared_worktree
-                            .diagnostic_summaries
-                            .values()
-                            .cloned()
-                            .collect(),
-                        visible: worktree.visible,
-                        scan_id: shared_worktree.scan_id,
-                    })
+                .map(|(id, worktree)| proto::WorktreeMetadata {
+                    id: *id,
+                    root_name: worktree.root_name.clone(),
+                    visible: worktree.visible,
                 })
                 .collect::<Vec<_>>();
 
@@ -840,14 +825,15 @@ impl Server {
                 }
             }
 
-            for (receipt, replica_id) in receipts_with_replica_ids {
+            // First, we send the metadata associated with each worktree.
+            for (receipt, replica_id) in &receipts_with_replica_ids {
                 self.peer.respond(
-                    receipt,
+                    receipt.clone(),
                     proto::JoinProjectResponse {
                         variant: Some(proto::join_project_response::Variant::Accept(
                             proto::join_project_response::Accept {
                                 worktrees: worktrees.clone(),
-                                replica_id: replica_id as u32,
+                                replica_id: *replica_id as u32,
                                 collaborators: collaborators.clone(),
                                 language_servers: project.language_servers.clone(),
                             },
@@ -855,6 +841,43 @@ impl Server {
                     },
                 )?;
             }
+
+            for (worktree_id, worktree) in &project.worktrees {
+                #[cfg(any(test, feature = "test-support"))]
+                const MAX_CHUNK_SIZE: usize = 2;
+                #[cfg(not(any(test, feature = "test-support")))]
+                const MAX_CHUNK_SIZE: usize = 256;
+
+                // Stream this worktree's entries.
+                let message = proto::UpdateWorktree {
+                    project_id: project_id.to_proto(),
+                    worktree_id: *worktree_id,
+                    root_name: worktree.root_name.clone(),
+                    updated_entries: worktree.entries.values().cloned().collect(),
+                    removed_entries: Default::default(),
+                    scan_id: worktree.scan_id,
+                    is_last_update: worktree.is_complete,
+                };
+                for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) {
+                    for (receipt, _) in &receipts_with_replica_ids {
+                        self.peer.send(receipt.sender_id, update.clone())?;
+                    }
+                }
+
+                // Stream this worktree's diagnostics.
+                for summary in worktree.diagnostic_summaries.values() {
+                    for (receipt, _) in &receipts_with_replica_ids {
+                        self.peer.send(
+                            receipt.sender_id,
+                            proto::UpdateDiagnosticSummary {
+                                project_id: project_id.to_proto(),
+                                worktree_id: *worktree_id,
+                                summary: Some(summary.clone()),
+                            },
+                        )?;
+                    }
+                }
+            }
         }
 
         self.update_user_contacts(host_user_id).await?;
@@ -869,7 +892,7 @@ impl Server {
         let project_id = ProjectId::from_proto(request.payload.project_id);
         let project;
         {
-            let mut store = self.store_mut().await;
+            let mut store = self.store().await;
             project = store.leave_project(sender_id, project_id)?;
             tracing::info!(
                 %project_id,
@@ -920,7 +943,7 @@ impl Server {
         let project_id = ProjectId::from_proto(request.payload.project_id);
         let user_id;
         {
-            let mut state = self.store_mut().await;
+            let mut state = self.store().await;
             user_id = state.user_id_for_connection(request.sender_id)?;
             let guest_connection_ids = state
                 .read_project(project_id, request.sender_id)?
@@ -939,7 +962,7 @@ impl Server {
         self: Arc<Server>,
         request: TypedEnvelope<proto::RegisterProjectActivity>,
     ) -> Result<()> {
-        self.store_mut().await.register_project_activity(
+        self.store().await.register_project_activity(
             ProjectId::from_proto(request.payload.project_id),
             request.sender_id,
         )?;
@@ -954,7 +977,7 @@ impl Server {
         let project_id = ProjectId::from_proto(request.payload.project_id);
         let worktree_id = request.payload.worktree_id;
         let (connection_ids, metadata_changed, extension_counts) = {
-            let mut store = self.store_mut().await;
+            let mut store = self.store().await;
             let (connection_ids, metadata_changed, extension_counts) = store.update_worktree(
                 request.sender_id,
                 project_id,
@@ -963,6 +986,7 @@ impl Server {
                 &request.payload.removed_entries,
                 &request.payload.updated_entries,
                 request.payload.scan_id,
+                request.payload.is_last_update,
             )?;
             (connection_ids, metadata_changed, extension_counts.clone())
         };
@@ -995,7 +1019,7 @@ impl Server {
             .summary
             .clone()
             .ok_or_else(|| anyhow!("invalid summary"))?;
-        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
+        let receiver_ids = self.store().await.update_diagnostic_summary(
             ProjectId::from_proto(request.payload.project_id),
             request.payload.worktree_id,
             request.sender_id,
@@ -1013,7 +1037,7 @@ impl Server {
         self: Arc<Server>,
         request: TypedEnvelope<proto::StartLanguageServer>,
     ) -> Result<()> {
-        let receiver_ids = self.store_mut().await.start_language_server(
+        let receiver_ids = self.store().await.start_language_server(
             ProjectId::from_proto(request.payload.project_id),
             request.sender_id,
             request
@@ -1052,20 +1076,23 @@ impl Server {
     where
         T: EntityMessage + RequestMessage,
     {
+        let project_id = ProjectId::from_proto(request.payload.remote_entity_id());
         let host_connection_id = self
             .store()
             .await
-            .read_project(
-                ProjectId::from_proto(request.payload.remote_entity_id()),
-                request.sender_id,
-            )?
+            .read_project(project_id, request.sender_id)?
             .host_connection_id;
+        let payload = self
+            .peer
+            .forward_request(request.sender_id, host_connection_id, request.payload)
+            .await?;
 
-        response.send(
-            self.peer
-                .forward_request(request.sender_id, host_connection_id, request.payload)
-                .await?,
-        )?;
+        // Ensure project still exists by the time we get the response from the host.
+        self.store()
+            .await
+            .read_project(project_id, request.sender_id)?;
+
+        response.send(payload)?;
         Ok(())
     }
 
@@ -1106,7 +1133,7 @@ impl Server {
     ) -> Result<()> {
         let project_id = ProjectId::from_proto(request.payload.project_id);
         let receiver_ids = {
-            let mut store = self.store_mut().await;
+            let mut store = self.store().await;
             store.register_project_activity(project_id, request.sender_id)?;
             store.project_connection_ids(project_id, request.sender_id)?
         };
@@ -1173,7 +1200,7 @@ impl Server {
         let leader_id = ConnectionId(request.payload.leader_id);
         let follower_id = request.sender_id;
         {
-            let mut store = self.store_mut().await;
+            let mut store = self.store().await;
             if !store
                 .project_connection_ids(project_id, follower_id)?
                 .contains(&leader_id)
@@ -1198,7 +1225,7 @@ impl Server {
     async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
         let project_id = ProjectId::from_proto(request.payload.project_id);
         let leader_id = ConnectionId(request.payload.leader_id);
-        let mut store = self.store_mut().await;
+        let mut store = self.store().await;
         if !store
             .project_connection_ids(project_id, request.sender_id)?
             .contains(&leader_id)
@@ -1216,7 +1243,7 @@ impl Server {
         request: TypedEnvelope<proto::UpdateFollowers>,
     ) -> Result<()> {
         let project_id = ProjectId::from_proto(request.payload.project_id);
-        let mut store = self.store_mut().await;
+        let mut store = self.store().await;
         store.register_project_activity(project_id, request.sender_id)?;
         let connection_ids = store.project_connection_ids(project_id, request.sender_id)?;
         let leader_id = request
@@ -1474,7 +1501,7 @@ impl Server {
             Err(anyhow!("access denied"))?;
         }
 
-        self.store_mut()
+        self.store()
             .await
             .join_channel(request.sender_id, channel_id);
         let messages = self
@@ -1516,7 +1543,7 @@ impl Server {
             Err(anyhow!("access denied"))?;
         }
 
-        self.store_mut()
+        self.store()
             .await
             .leave_channel(request.sender_id, channel_id);
 
@@ -1624,25 +1651,13 @@ impl Server {
         Ok(())
     }
 
-    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
-        #[cfg(test)]
-        tokio::task::yield_now().await;
-        let guard = self.store.read().await;
-        #[cfg(test)]
-        tokio::task::yield_now().await;
-        StoreReadGuard {
-            guard,
-            _not_send: PhantomData,
-        }
-    }
-
-    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
+    pub(crate) async fn store<'a>(&'a self) -> StoreGuard<'a> {
         #[cfg(test)]
         tokio::task::yield_now().await;
-        let guard = self.store.write().await;
+        let guard = self.store.lock().await;
         #[cfg(test)]
         tokio::task::yield_now().await;
-        StoreWriteGuard {
+        StoreGuard {
             guard,
             _not_send: PhantomData,
         }
@@ -1650,21 +1665,13 @@ impl Server {
 
     pub async fn snapshot<'a>(self: &'a Arc<Self>) -> ServerSnapshot<'a> {
         ServerSnapshot {
-            store: self.store.read().await,
+            store: self.store().await,
             peer: &self.peer,
         }
     }
 }
 
-impl<'a> Deref for StoreReadGuard<'a> {
-    type Target = Store;
-
-    fn deref(&self) -> &Self::Target {
-        &*self.guard
-    }
-}
-
-impl<'a> Deref for StoreWriteGuard<'a> {
+impl<'a> Deref for StoreGuard<'a> {
     type Target = Store;
 
     fn deref(&self) -> &Self::Target {
@@ -1672,13 +1679,13 @@ impl<'a> Deref for StoreWriteGuard<'a> {
     }
 }
 
-impl<'a> DerefMut for StoreWriteGuard<'a> {
+impl<'a> DerefMut for StoreGuard<'a> {
     fn deref_mut(&mut self) -> &mut Self::Target {
         &mut *self.guard
     }
 }
 
-impl<'a> Drop for StoreWriteGuard<'a> {
+impl<'a> Drop for StoreGuard<'a> {
     fn drop(&mut self) {
         #[cfg(test)]
         self.check_invariants();

crates/collab/src/rpc/store.rs 🔗

@@ -62,6 +62,7 @@ pub struct Worktree {
     #[serde(skip)]
     pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
     pub scan_id: u64,
+    pub is_complete: bool,
 }
 
 #[derive(Default)]
@@ -615,6 +616,7 @@ impl Store {
         removed_entries: &[u64],
         updated_entries: &[proto::Entry],
         scan_id: u64,
+        is_last_update: bool,
     ) -> Result<(Vec<ConnectionId>, bool, HashMap<String, usize>)> {
         let project = self.write_project(project_id, connection_id)?;
         let connection_ids = project.connection_ids();
@@ -657,6 +659,7 @@ impl Store {
         }
 
         worktree.scan_id = scan_id;
+        worktree.is_complete = is_last_update;
         Ok((
             connection_ids,
             metadata_changed,

crates/project/src/project.rs 🔗

@@ -507,10 +507,9 @@ impl Project {
 
         let mut worktrees = Vec::new();
         for worktree in response.worktrees {
-            let (worktree, load_task) = cx
+            let worktree = cx
                 .update(|cx| Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx));
             worktrees.push(worktree);
-            load_task.detach();
         }
 
         let (opened_buffer_tx, opened_buffer_rx) = watch::channel();
@@ -1102,7 +1101,7 @@ impl Project {
                     .ok_or_else(|| anyhow!("missing entry in response"))?;
                 worktree
                     .update(&mut cx, |worktree, cx| {
-                        worktree.as_remote().unwrap().insert_entry(
+                        worktree.as_remote_mut().unwrap().insert_entry(
                             entry,
                             response.worktree_scan_id as usize,
                             cx,
@@ -1145,7 +1144,7 @@ impl Project {
                     .ok_or_else(|| anyhow!("missing entry in response"))?;
                 worktree
                     .update(&mut cx, |worktree, cx| {
-                        worktree.as_remote().unwrap().insert_entry(
+                        worktree.as_remote_mut().unwrap().insert_entry(
                             entry,
                             response.worktree_scan_id as usize,
                             cx,
@@ -1188,7 +1187,7 @@ impl Project {
                     .ok_or_else(|| anyhow!("missing entry in response"))?;
                 worktree
                     .update(&mut cx, |worktree, cx| {
-                        worktree.as_remote().unwrap().insert_entry(
+                        worktree.as_remote_mut().unwrap().insert_entry(
                             entry,
                             response.worktree_scan_id as usize,
                             cx,
@@ -1221,7 +1220,7 @@ impl Project {
                     .await?;
                 worktree
                     .update(&mut cx, move |worktree, cx| {
-                        worktree.as_remote().unwrap().delete_entry(
+                        worktree.as_remote_mut().unwrap().delete_entry(
                             entry_id,
                             response.worktree_scan_id as usize,
                             cx,
@@ -1352,12 +1351,13 @@ impl Project {
             let client = self.client.clone();
             cx.foreground()
                 .spawn(async move {
-                    share.await?;
                     client.send(proto::RespondToJoinProjectRequest {
                         requester_id,
                         project_id,
                         allow,
-                    })
+                    })?;
+                    share.await?;
+                    anyhow::Ok(())
                 })
                 .detach_and_log_err(cx);
         }
@@ -4552,18 +4552,9 @@ impl Project {
                 {
                     this.worktrees.push(WorktreeHandle::Strong(old_worktree));
                 } else {
-                    let worktree = proto::Worktree {
-                        id: worktree.id,
-                        root_name: worktree.root_name,
-                        entries: Default::default(),
-                        diagnostic_summaries: Default::default(),
-                        visible: worktree.visible,
-                        scan_id: 0,
-                    };
-                    let (worktree, load_task) =
+                    let worktree =
                         Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx);
                     this.add_worktree(&worktree, cx);
-                    load_task.detach();
                 }
             }
 
@@ -4587,8 +4578,8 @@ impl Project {
             if let Some(worktree) = this.worktree_for_id(worktree_id, cx) {
                 worktree.update(cx, |worktree, _| {
                     let worktree = worktree.as_remote_mut().unwrap();
-                    worktree.update_from_remote(envelope)
-                })?;
+                    worktree.update_from_remote(envelope.payload);
+                });
             }
             Ok(())
         })
@@ -8125,7 +8116,10 @@ mod tests {
     }
 
     #[gpui::test(retries = 5)]
-    async fn test_rescan_and_remote_updates(cx: &mut gpui::TestAppContext) {
+    async fn test_rescan_and_remote_updates(
+        deterministic: Arc<Deterministic>,
+        cx: &mut gpui::TestAppContext,
+    ) {
         let dir = temp_tree(json!({
             "a": {
                 "file1": "",
@@ -8169,17 +8163,24 @@ mod tests {
         // Create a remote copy of this worktree.
         let tree = project.read_with(cx, |project, cx| project.worktrees(cx).next().unwrap());
         let initial_snapshot = tree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot());
-        let (remote, load_task) = cx.update(|cx| {
+        let remote = cx.update(|cx| {
             Worktree::remote(
                 1,
                 1,
-                initial_snapshot.to_proto(&Default::default(), true),
+                proto::WorktreeMetadata {
+                    id: initial_snapshot.id().to_proto(),
+                    root_name: initial_snapshot.root_name().into(),
+                    visible: true,
+                },
                 rpc.clone(),
                 cx,
             )
         });
-        // tree
-        load_task.await;
+        remote.update(cx, |remote, _| {
+            let update = initial_snapshot.build_initial_update(1);
+            remote.as_remote_mut().unwrap().update_from_remote(update);
+        });
+        deterministic.run_until_parked();
 
         cx.read(|cx| {
             assert!(!buffer2.read(cx).is_dirty());
@@ -8245,19 +8246,16 @@ mod tests {
         // Update the remote worktree. Check that it becomes consistent with the
         // local worktree.
         remote.update(cx, |remote, cx| {
-            let update_message = tree.read(cx).as_local().unwrap().snapshot().build_update(
+            let update = tree.read(cx).as_local().unwrap().snapshot().build_update(
                 &initial_snapshot,
                 1,
                 1,
                 true,
             );
-            remote
-                .as_remote_mut()
-                .unwrap()
-                .snapshot
-                .apply_remote_update(update_message)
-                .unwrap();
-
+            remote.as_remote_mut().unwrap().update_from_remote(update);
+        });
+        deterministic.run_until_parked();
+        remote.read_with(cx, |remote, _| {
             assert_eq!(
                 remote
                     .paths()

crates/project/src/worktree.rs 🔗

@@ -7,9 +7,9 @@ use super::{
 };
 use ::ignore::gitignore::{Gitignore, GitignoreBuilder};
 use anyhow::{anyhow, Context, Result};
-use client::{proto, Client, TypedEnvelope};
+use client::{proto, Client};
 use clock::ReplicaId;
-use collections::HashMap;
+use collections::{HashMap, VecDeque};
 use futures::{
     channel::{
         mpsc::{self, UnboundedSender},
@@ -40,11 +40,11 @@ use std::{
     ffi::{OsStr, OsString},
     fmt,
     future::Future,
-    mem,
     ops::{Deref, DerefMut},
     os::unix::prelude::{OsStrExt, OsStringExt},
     path::{Path, PathBuf},
     sync::{atomic::AtomicUsize, Arc},
+    task::Poll,
     time::{Duration, SystemTime},
 };
 use sum_tree::{Bias, Edit, SeekTarget, SumTree, TreeMap};
@@ -82,7 +82,7 @@ pub struct RemoteWorktree {
     project_id: u64,
     client: Arc<Client>,
     updates_tx: Option<UnboundedSender<proto::UpdateWorktree>>,
-    last_scan_id_rx: watch::Receiver<usize>,
+    snapshot_subscriptions: VecDeque<(usize, oneshot::Sender<()>)>,
     replica_id: ReplicaId,
     diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
     visible: bool,
@@ -96,6 +96,7 @@ pub struct Snapshot {
     entries_by_path: SumTree<Entry>,
     entries_by_id: SumTree<PathEntry>,
     scan_id: usize,
+    is_complete: bool,
 }
 
 #[derive(Clone)]
@@ -124,13 +125,16 @@ impl DerefMut for LocalSnapshot {
 #[derive(Clone, Debug)]
 enum ScanState {
     Idle,
-    Scanning,
+    /// The worktree is performing its initial scan of the filesystem.
+    Initializing,
+    /// The worktree is updating in response to filesystem events.
+    Updating,
     Err(Arc<anyhow::Error>),
 }
 
 struct ShareState {
     project_id: u64,
-    snapshots_tx: Sender<LocalSnapshot>,
+    snapshots_tx: watch::Sender<LocalSnapshot>,
     _maintain_remote_snapshot: Option<Task<Option<()>>>,
 }
 
@@ -171,10 +175,10 @@ impl Worktree {
     pub fn remote(
         project_remote_id: u64,
         replica_id: ReplicaId,
-        worktree: proto::Worktree,
+        worktree: proto::WorktreeMetadata,
         client: Arc<Client>,
         cx: &mut MutableAppContext,
-    ) -> (ModelHandle<Self>, Task<()>) {
+    ) -> ModelHandle<Self> {
         let remote_id = worktree.id;
         let root_char_bag: CharBag = worktree
             .root_name
@@ -189,13 +193,13 @@ impl Worktree {
             root_char_bag,
             entries_by_path: Default::default(),
             entries_by_id: Default::default(),
-            scan_id: worktree.scan_id as usize,
+            scan_id: 0,
+            is_complete: false,
         };
 
         let (updates_tx, mut updates_rx) = mpsc::unbounded();
         let background_snapshot = Arc::new(Mutex::new(snapshot.clone()));
         let (mut snapshot_updated_tx, mut snapshot_updated_rx) = watch::channel();
-        let (mut last_scan_id_tx, last_scan_id_rx) = watch::channel_with(worktree.scan_id as usize);
         let worktree_handle = cx.add_model(|_: &mut ModelContext<Worktree>| {
             Worktree::Remote(RemoteWorktree {
                 project_id: project_remote_id,
@@ -203,96 +207,50 @@ impl Worktree {
                 snapshot: snapshot.clone(),
                 background_snapshot: background_snapshot.clone(),
                 updates_tx: Some(updates_tx),
-                last_scan_id_rx,
+                snapshot_subscriptions: Default::default(),
                 client: client.clone(),
-                diagnostic_summaries: TreeMap::from_ordered_entries(
-                    worktree.diagnostic_summaries.into_iter().map(|summary| {
-                        (
-                            PathKey(PathBuf::from(summary.path).into()),
-                            DiagnosticSummary {
-                                language_server_id: summary.language_server_id as usize,
-                                error_count: summary.error_count as usize,
-                                warning_count: summary.warning_count as usize,
-                            },
-                        )
-                    }),
-                ),
+                diagnostic_summaries: Default::default(),
                 visible,
             })
         });
 
-        let deserialize_task = cx.spawn({
-            let worktree_handle = worktree_handle.clone();
-            |cx| async move {
-                let (entries_by_path, entries_by_id) = cx
-                    .background()
-                    .spawn(async move {
-                        let mut entries_by_path_edits = Vec::new();
-                        let mut entries_by_id_edits = Vec::new();
-                        for entry in worktree.entries {
-                            match Entry::try_from((&root_char_bag, entry)) {
-                                Ok(entry) => {
-                                    entries_by_id_edits.push(Edit::Insert(PathEntry {
-                                        id: entry.id,
-                                        path: entry.path.clone(),
-                                        is_ignored: entry.is_ignored,
-                                        scan_id: 0,
-                                    }));
-                                    entries_by_path_edits.push(Edit::Insert(entry));
-                                }
-                                Err(err) => log::warn!("error for remote worktree entry {:?}", err),
-                            }
-                        }
-
-                        let mut entries_by_path = SumTree::new();
-                        let mut entries_by_id = SumTree::new();
-                        entries_by_path.edit(entries_by_path_edits, &());
-                        entries_by_id.edit(entries_by_id_edits, &());
-
-                        (entries_by_path, entries_by_id)
-                    })
-                    .await;
-
-                {
-                    let mut snapshot = background_snapshot.lock();
-                    snapshot.entries_by_path = entries_by_path;
-                    snapshot.entries_by_id = entries_by_id;
+        cx.background()
+            .spawn(async move {
+                while let Some(update) = updates_rx.next().await {
+                    if let Err(error) = background_snapshot.lock().apply_remote_update(update) {
+                        log::error!("error applying worktree update: {}", error);
+                    }
                     snapshot_updated_tx.send(()).await.ok();
                 }
+            })
+            .detach();
 
-                cx.background()
-                    .spawn(async move {
-                        while let Some(update) = updates_rx.next().await {
-                            if let Err(error) =
-                                background_snapshot.lock().apply_remote_update(update)
-                            {
-                                log::error!("error applying worktree update: {}", error);
-                            }
-                            snapshot_updated_tx.send(()).await.ok();
-                        }
-                    })
-                    .detach();
-
-                cx.spawn(|mut cx| {
-                    let this = worktree_handle.downgrade();
-                    async move {
-                        while let Some(_) = snapshot_updated_rx.recv().await {
-                            if let Some(this) = this.upgrade(&cx) {
-                                this.update(&mut cx, |this, cx| {
-                                    this.poll_snapshot(cx);
-                                    let this = this.as_remote_mut().unwrap();
-                                    *last_scan_id_tx.borrow_mut() = this.snapshot.scan_id;
-                                });
-                            } else {
-                                break;
+        cx.spawn(|mut cx| {
+            let this = worktree_handle.downgrade();
+            async move {
+                while let Some(_) = snapshot_updated_rx.recv().await {
+                    if let Some(this) = this.upgrade(&cx) {
+                        this.update(&mut cx, |this, cx| {
+                            this.poll_snapshot(cx);
+                            let this = this.as_remote_mut().unwrap();
+                            while let Some((scan_id, _)) = this.snapshot_subscriptions.front() {
+                                if this.observed_snapshot(*scan_id) {
+                                    let (_, tx) = this.snapshot_subscriptions.pop_front().unwrap();
+                                    let _ = tx.send(());
+                                } else {
+                                    break;
+                                }
                             }
-                        }
+                        });
+                    } else {
+                        break;
                     }
-                })
-                .detach();
+                }
             }
-        });
-        (worktree_handle, deserialize_task)
+        })
+        .detach();
+
+        worktree_handle
     }
 
     pub fn as_local(&self) -> Option<&LocalWorktree> {
@@ -376,38 +334,9 @@ impl Worktree {
 
     fn poll_snapshot(&mut self, cx: &mut ModelContext<Self>) {
         match self {
-            Self::Local(worktree) => {
-                let is_fake_fs = worktree.fs.is_fake();
-                worktree.snapshot = worktree.background_snapshot.lock().clone();
-                if worktree.is_scanning() {
-                    if worktree.poll_task.is_none() {
-                        worktree.poll_task = Some(cx.spawn_weak(|this, mut cx| async move {
-                            if is_fake_fs {
-                                #[cfg(any(test, feature = "test-support"))]
-                                cx.background().simulate_random_delay().await;
-                            } else {
-                                smol::Timer::after(Duration::from_millis(100)).await;
-                            }
-                            if let Some(this) = this.upgrade(&cx) {
-                                this.update(&mut cx, |this, cx| {
-                                    this.as_local_mut().unwrap().poll_task = None;
-                                    this.poll_snapshot(cx);
-                                });
-                            }
-                        }));
-                    }
-                } else {
-                    worktree.poll_task.take();
-                    cx.emit(Event::UpdatedEntries);
-                }
-            }
-            Self::Remote(worktree) => {
-                worktree.snapshot = worktree.background_snapshot.lock().clone();
-                cx.emit(Event::UpdatedEntries);
-            }
+            Self::Local(worktree) => worktree.poll_snapshot(false, cx),
+            Self::Remote(worktree) => worktree.poll_snapshot(cx),
         };
-
-        cx.notify();
     }
 }
 
@@ -435,7 +364,8 @@ impl LocalWorktree {
             .context("failed to stat worktree path")?;
 
         let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded();
-        let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning);
+        let (mut last_scan_state_tx, last_scan_state_rx) =
+            watch::channel_with(ScanState::Initializing);
         let tree = cx.add_model(move |cx: &mut ModelContext<Worktree>| {
             let mut snapshot = LocalSnapshot {
                 abs_path,
@@ -449,6 +379,7 @@ impl LocalWorktree {
                     entries_by_path: Default::default(),
                     entries_by_id: Default::default(),
                     scan_id: 0,
+                    is_complete: true,
                 },
             };
             if let Some(metadata) = metadata {
@@ -479,11 +410,7 @@ impl LocalWorktree {
                 while let Some(scan_state) = scan_states_rx.next().await {
                     if let Some(this) = this.upgrade(&cx) {
                         last_scan_state_tx.blocking_send(scan_state).ok();
-                        this.update(&mut cx, |this, cx| {
-                            this.poll_snapshot(cx);
-                            this.as_local().unwrap().broadcast_snapshot()
-                        })
-                        .await;
+                        this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
                     } else {
                         break;
                     }
@@ -567,22 +494,53 @@ impl LocalWorktree {
         Ok(updated)
     }
 
+    fn poll_snapshot(&mut self, force: bool, cx: &mut ModelContext<Worktree>) {
+        self.poll_task.take();
+        match self.scan_state() {
+            ScanState::Idle => {
+                self.snapshot = self.background_snapshot.lock().clone();
+                if let Some(share) = self.share.as_mut() {
+                    *share.snapshots_tx.borrow_mut() = self.snapshot.clone();
+                }
+                cx.emit(Event::UpdatedEntries);
+            }
+            ScanState::Initializing => {
+                let is_fake_fs = self.fs.is_fake();
+                self.snapshot = self.background_snapshot.lock().clone();
+                self.poll_task = Some(cx.spawn_weak(|this, mut cx| async move {
+                    if is_fake_fs {
+                        #[cfg(any(test, feature = "test-support"))]
+                        cx.background().simulate_random_delay().await;
+                    } else {
+                        smol::Timer::after(Duration::from_millis(100)).await;
+                    }
+                    if let Some(this) = this.upgrade(&cx) {
+                        this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
+                    }
+                }));
+                cx.emit(Event::UpdatedEntries);
+            }
+            _ => {
+                if force {
+                    self.snapshot = self.background_snapshot.lock().clone();
+                }
+            }
+        }
+        cx.notify();
+    }
+
     pub fn scan_complete(&self) -> impl Future<Output = ()> {
         let mut scan_state_rx = self.last_scan_state_rx.clone();
         async move {
             let mut scan_state = Some(scan_state_rx.borrow().clone());
-            while let Some(ScanState::Scanning) = scan_state {
+            while let Some(ScanState::Initializing | ScanState::Updating) = scan_state {
                 scan_state = scan_state_rx.recv().await;
             }
         }
     }
 
-    fn is_scanning(&self) -> bool {
-        if let ScanState::Scanning = *self.last_scan_state_rx.borrow() {
-            true
-        } else {
-            false
-        }
+    fn scan_state(&self) -> ScanState {
+        self.last_scan_state_rx.borrow().clone()
     }
 
     pub fn snapshot(&self) -> LocalSnapshot {
@@ -612,7 +570,6 @@ impl LocalWorktree {
                         .refresh_entry(path, abs_path, None, cx)
                 })
                 .await?;
-            this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
             Ok((
                 File {
                     entry_id: Some(entry.id),
@@ -710,16 +667,14 @@ impl LocalWorktree {
 
         Some(cx.spawn(|this, mut cx| async move {
             delete.await?;
-            this.update(&mut cx, |this, _| {
+            this.update(&mut cx, |this, cx| {
                 let this = this.as_local_mut().unwrap();
-                let mut snapshot = this.background_snapshot.lock();
-                snapshot.delete_entry(entry_id);
+                {
+                    let mut snapshot = this.background_snapshot.lock();
+                    snapshot.delete_entry(entry_id);
+                }
+                this.poll_snapshot(true, cx);
             });
-            this.update(&mut cx, |this, cx| {
-                this.poll_snapshot(cx);
-                this.as_local().unwrap().broadcast_snapshot()
-            })
-            .await;
             Ok(())
         }))
     }
@@ -755,11 +710,6 @@ impl LocalWorktree {
                     )
                 })
                 .await?;
-            this.update(&mut cx, |this, cx| {
-                this.poll_snapshot(cx);
-                this.as_local().unwrap().broadcast_snapshot()
-            })
-            .await;
             Ok(entry)
         }))
     }
@@ -795,11 +745,6 @@ impl LocalWorktree {
                     )
                 })
                 .await?;
-            this.update(&mut cx, |this, cx| {
-                this.poll_snapshot(cx);
-                this.as_local().unwrap().broadcast_snapshot()
-            })
-            .await;
             Ok(entry)
         }))
     }
@@ -833,11 +778,6 @@ impl LocalWorktree {
                         .refresh_entry(path, abs_path, None, cx)
                 })
                 .await?;
-            this.update(&mut cx, |this, cx| {
-                this.poll_snapshot(cx);
-                this.as_local().unwrap().broadcast_snapshot()
-            })
-            .await;
             Ok(entry)
         })
     }
@@ -870,61 +810,55 @@ impl LocalWorktree {
             let this = this
                 .upgrade(&cx)
                 .ok_or_else(|| anyhow!("worktree was dropped"))?;
-            let (entry, snapshot, snapshots_tx) = this.read_with(&cx, |this, _| {
-                let this = this.as_local().unwrap();
-                let mut snapshot = this.background_snapshot.lock();
-                entry.is_ignored = snapshot
-                    .ignore_stack_for_path(&path, entry.is_dir())
-                    .is_path_ignored(&path, entry.is_dir());
-                if let Some(old_path) = old_path {
-                    snapshot.remove_path(&old_path);
+            this.update(&mut cx, |this, cx| {
+                let this = this.as_local_mut().unwrap();
+                let inserted_entry;
+                {
+                    let mut snapshot = this.background_snapshot.lock();
+                    entry.is_ignored = snapshot
+                        .ignore_stack_for_path(&path, entry.is_dir())
+                        .is_path_ignored(&path, entry.is_dir());
+                    if let Some(old_path) = old_path {
+                        snapshot.remove_path(&old_path);
+                    }
+                    inserted_entry = snapshot.insert_entry(entry, fs.as_ref());
+                    snapshot.scan_id += 1;
                 }
-                let entry = snapshot.insert_entry(entry, fs.as_ref());
-                snapshot.scan_id += 1;
-                let snapshots_tx = this.share.as_ref().map(|s| s.snapshots_tx.clone());
-                (entry, snapshot.clone(), snapshots_tx)
-            });
-            this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
-
-            if let Some(snapshots_tx) = snapshots_tx {
-                snapshots_tx.send(snapshot).await.ok();
-            }
-
-            Ok(entry)
+                this.poll_snapshot(true, cx);
+                Ok(inserted_entry)
+            })
         })
     }
 
     pub fn share(&mut self, project_id: u64, cx: &mut ModelContext<Worktree>) -> Task<Result<()>> {
         let (share_tx, share_rx) = oneshot::channel();
-        let (snapshots_to_send_tx, snapshots_to_send_rx) =
-            smol::channel::unbounded::<LocalSnapshot>();
+
         if self.share.is_some() {
             let _ = share_tx.send(Ok(()));
         } else {
+            let (snapshots_tx, mut snapshots_rx) = watch::channel_with(self.snapshot());
             let rpc = self.client.clone();
             let worktree_id = cx.model_id() as u64;
             let maintain_remote_snapshot = cx.background().spawn({
                 let rpc = rpc.clone();
                 let diagnostic_summaries = self.diagnostic_summaries.clone();
                 async move {
-                    let mut prev_snapshot = match snapshots_to_send_rx.recv().await {
-                        Ok(snapshot) => {
-                            if let Err(error) = rpc
-                                .request(proto::UpdateWorktree {
-                                    project_id,
-                                    worktree_id,
-                                    root_name: snapshot.root_name().to_string(),
-                                    updated_entries: snapshot
-                                        .entries_by_path
-                                        .iter()
-                                        .filter(|e| !e.is_ignored)
-                                        .map(Into::into)
-                                        .collect(),
-                                    removed_entries: Default::default(),
-                                    scan_id: snapshot.scan_id as u64,
-                                })
-                                .await
-                            {
+                    let mut prev_snapshot = match snapshots_rx.recv().await {
+                        Some(snapshot) => {
+                            let update = proto::UpdateWorktree {
+                                project_id,
+                                worktree_id,
+                                root_name: snapshot.root_name().to_string(),
+                                updated_entries: snapshot
+                                    .entries_by_path
+                                    .iter()
+                                    .map(Into::into)
+                                    .collect(),
+                                removed_entries: Default::default(),
+                                scan_id: snapshot.scan_id as u64,
+                                is_last_update: true,
+                            };
+                            if let Err(error) = send_worktree_update(&rpc, update).await {
                                 let _ = share_tx.send(Err(error));
                                 return Err(anyhow!("failed to send initial update worktree"));
                             } else {
@@ -932,8 +866,10 @@ impl LocalWorktree {
                                 snapshot
                             }
                         }
-                        Err(error) => {
-                            let _ = share_tx.send(Err(error.into()));
+                        None => {
+                            share_tx
+                                .send(Err(anyhow!("worktree dropped before share completed")))
+                                .ok();
                             return Err(anyhow!("failed to send initial update worktree"));
                         }
                     };
@@ -946,44 +882,12 @@ impl LocalWorktree {
                         })?;
                     }
 
-                    // Stream ignored entries in chunks.
-                    {
-                        let mut ignored_entries = prev_snapshot
-                            .entries_by_path
-                            .iter()
-                            .filter(|e| e.is_ignored);
-                        let mut ignored_entries_to_send = Vec::new();
-                        loop {
-                            #[cfg(any(test, feature = "test-support"))]
-                            const CHUNK_SIZE: usize = 2;
-                            #[cfg(not(any(test, feature = "test-support")))]
-                            const CHUNK_SIZE: usize = 256;
-
-                            let entry = ignored_entries.next();
-                            if ignored_entries_to_send.len() >= CHUNK_SIZE || entry.is_none() {
-                                rpc.request(proto::UpdateWorktree {
-                                    project_id,
-                                    worktree_id,
-                                    root_name: prev_snapshot.root_name().to_string(),
-                                    updated_entries: mem::take(&mut ignored_entries_to_send),
-                                    removed_entries: Default::default(),
-                                    scan_id: prev_snapshot.scan_id as u64,
-                                })
-                                .await?;
-                            }
-
-                            if let Some(entry) = entry {
-                                ignored_entries_to_send.push(entry.into());
-                            } else {
-                                break;
-                            }
-                        }
-                    }
-
-                    while let Ok(snapshot) = snapshots_to_send_rx.recv().await {
-                        let message =
-                            snapshot.build_update(&prev_snapshot, project_id, worktree_id, true);
-                        rpc.request(message).await?;
+                    while let Some(snapshot) = snapshots_rx.recv().await {
+                        send_worktree_update(
+                            &rpc,
+                            snapshot.build_update(&prev_snapshot, project_id, worktree_id, true),
+                        )
+                        .await?;
                         prev_snapshot = snapshot;
                     }
 
@@ -993,18 +897,12 @@ impl LocalWorktree {
             });
             self.share = Some(ShareState {
                 project_id,
-                snapshots_tx: snapshots_to_send_tx.clone(),
+                snapshots_tx,
                 _maintain_remote_snapshot: Some(maintain_remote_snapshot),
             });
         }
 
-        cx.spawn_weak(|this, cx| async move {
-            if let Some(this) = this.upgrade(&cx) {
-                this.read_with(&cx, |this, _| {
-                    let this = this.as_local().unwrap();
-                    let _ = snapshots_to_send_tx.try_send(this.snapshot());
-                });
-            }
+        cx.foreground().spawn(async move {
             share_rx
                 .await
                 .unwrap_or_else(|_| Err(anyhow!("share ended")))
@@ -1018,23 +916,6 @@ impl LocalWorktree {
     pub fn is_shared(&self) -> bool {
         self.share.is_some()
     }
-
-    fn broadcast_snapshot(&self) -> impl Future<Output = ()> {
-        let mut to_send = None;
-        if !self.is_scanning() {
-            if let Some(share) = self.share.as_ref() {
-                to_send = Some((self.snapshot(), share.snapshots_tx.clone()));
-            }
-        }
-
-        async move {
-            if let Some((snapshot, snapshots_to_send_tx)) = to_send {
-                if let Err(err) = snapshots_to_send_tx.send(snapshot).await {
-                    log::error!("error submitting snapshot to send {}", err);
-                }
-            }
-        }
-    }
 }
 
 impl RemoteWorktree {
@@ -1042,31 +923,45 @@ impl RemoteWorktree {
         self.snapshot.clone()
     }
 
+    fn poll_snapshot(&mut self, cx: &mut ModelContext<Worktree>) {
+        self.snapshot = self.background_snapshot.lock().clone();
+        cx.emit(Event::UpdatedEntries);
+        cx.notify();
+    }
+
     pub fn disconnected_from_host(&mut self) {
         self.updates_tx.take();
+        self.snapshot_subscriptions.clear();
     }
 
-    pub fn update_from_remote(
-        &mut self,
-        envelope: TypedEnvelope<proto::UpdateWorktree>,
-    ) -> Result<()> {
+    pub fn update_from_remote(&mut self, update: proto::UpdateWorktree) {
         if let Some(updates_tx) = &self.updates_tx {
             updates_tx
-                .unbounded_send(envelope.payload)
+                .unbounded_send(update)
                 .expect("consumer runs to completion");
         }
-        Ok(())
     }
 
-    fn wait_for_snapshot(&self, scan_id: usize) -> impl Future<Output = ()> {
-        let mut rx = self.last_scan_id_rx.clone();
-        async move {
-            while let Some(applied_scan_id) = rx.next().await {
-                if applied_scan_id >= scan_id {
-                    return;
-                }
+    fn observed_snapshot(&self, scan_id: usize) -> bool {
+        self.scan_id > scan_id || (self.scan_id == scan_id && self.is_complete)
+    }
+
+    fn wait_for_snapshot(&mut self, scan_id: usize) -> impl Future<Output = ()> {
+        let (tx, rx) = oneshot::channel();
+        if self.observed_snapshot(scan_id) {
+            let _ = tx.send(());
+        } else {
+            match self
+                .snapshot_subscriptions
+                .binary_search_by_key(&scan_id, |probe| probe.0)
+            {
+                Ok(ix) | Err(ix) => self.snapshot_subscriptions.insert(ix, (scan_id, tx)),
             }
         }
+
+        async move {
+            let _ = rx.await;
+        }
     }
 
     pub fn update_diagnostic_summary(
@@ -1088,7 +983,7 @@ impl RemoteWorktree {
     }
 
     pub fn insert_entry(
-        &self,
+        &mut self,
         entry: proto::Entry,
         scan_id: usize,
         cx: &mut ModelContext<Worktree>,
@@ -1107,7 +1002,7 @@ impl RemoteWorktree {
     }
 
     pub(crate) fn delete_entry(
-        &self,
+        &mut self,
         id: ProjectEntryId,
         scan_id: usize,
         cx: &mut ModelContext<Worktree>,
@@ -1183,7 +1078,7 @@ impl Snapshot {
         for entry_id in update.removed_entries {
             let entry = self
                 .entry_for_id(ProjectEntryId::from_proto(entry_id))
-                .ok_or_else(|| anyhow!("unknown entry"))?;
+                .ok_or_else(|| anyhow!("unknown entry {}", entry_id))?;
             entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone())));
             entries_by_id_edits.push(Edit::Remove(entry.id));
         }
@@ -1205,6 +1100,7 @@ impl Snapshot {
         self.entries_by_path.edit(entries_by_path_edits, &());
         self.entries_by_id.edit(entries_by_id_edits, &());
         self.scan_id = update.scan_id as usize;
+        self.is_complete = update.is_last_update;
 
         Ok(())
     }
@@ -1326,27 +1222,16 @@ impl LocalSnapshot {
     }
 
     #[cfg(test)]
-    pub(crate) fn to_proto(
-        &self,
-        diagnostic_summaries: &TreeMap<PathKey, DiagnosticSummary>,
-        visible: bool,
-    ) -> proto::Worktree {
+    pub(crate) fn build_initial_update(&self, project_id: u64) -> proto::UpdateWorktree {
         let root_name = self.root_name.clone();
-        proto::Worktree {
-            id: self.id.0 as u64,
+        proto::UpdateWorktree {
+            project_id,
+            worktree_id: self.id().to_proto(),
             root_name,
-            entries: self
-                .entries_by_path
-                .iter()
-                .filter(|e| !e.is_ignored)
-                .map(Into::into)
-                .collect(),
-            diagnostic_summaries: diagnostic_summaries
-                .iter()
-                .map(|(path, summary)| summary.to_proto(&path.0))
-                .collect(),
-            visible,
+            updated_entries: self.entries_by_path.iter().map(Into::into).collect(),
+            removed_entries: Default::default(),
             scan_id: self.scan_id as u64,
+            is_last_update: true,
         }
     }
 
@@ -1413,6 +1298,7 @@ impl LocalSnapshot {
             updated_entries,
             removed_entries,
             scan_id: self.scan_id as u64,
+            is_last_update: true,
         }
     }
 
@@ -2050,7 +1936,7 @@ impl BackgroundScanner {
     }
 
     async fn run(mut self, events_rx: impl Stream<Item = Vec<fsevent::Event>>) {
-        if self.notify.unbounded_send(ScanState::Scanning).is_err() {
+        if self.notify.unbounded_send(ScanState::Initializing).is_err() {
             return;
         }
 
@@ -2069,8 +1955,13 @@ impl BackgroundScanner {
         }
 
         futures::pin_mut!(events_rx);
-        while let Some(events) = events_rx.next().await {
-            if self.notify.unbounded_send(ScanState::Scanning).is_err() {
+
+        while let Some(mut events) = events_rx.next().await {
+            while let Poll::Ready(Some(additional_events)) = futures::poll!(events_rx.next()) {
+                events.extend(additional_events);
+            }
+
+            if self.notify.unbounded_send(ScanState::Updating).is_err() {
                 break;
             }
 
@@ -2722,6 +2613,19 @@ impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
     }
 }
 
+async fn send_worktree_update(client: &Arc<Client>, update: proto::UpdateWorktree) -> Result<()> {
+    #[cfg(any(test, feature = "test-support"))]
+    const MAX_CHUNK_SIZE: usize = 2;
+    #[cfg(not(any(test, feature = "test-support")))]
+    const MAX_CHUNK_SIZE: usize = 256;
+
+    for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) {
+        client.request(update).await?;
+    }
+
+    Ok(())
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -2931,6 +2835,7 @@ mod tests {
                 root_name: Default::default(),
                 root_char_bag: Default::default(),
                 scan_id: 0,
+                is_complete: true,
             },
         };
         initial_snapshot.insert_entry(

crates/rpc/proto/zed.proto 🔗

@@ -168,7 +168,7 @@ message JoinProjectResponse {
 
     message Accept {
         uint32 replica_id = 1;
-        repeated Worktree worktrees = 2;
+        repeated WorktreeMetadata worktrees = 2;
         repeated Collaborator collaborators = 3;
         repeated LanguageServer language_servers = 4;        
     }
@@ -195,6 +195,7 @@ message UpdateWorktree {
     repeated Entry updated_entries = 4;
     repeated uint64 removed_entries = 5;
     uint64 scan_id = 6;
+    bool is_last_update = 7;
 }
 
 message CreateProjectEntry {
@@ -765,15 +766,6 @@ message User {
     string avatar_url = 3;
 }
 
-message Worktree {
-    uint64 id = 1;
-    string root_name = 2;
-    repeated Entry entries = 3;
-    repeated DiagnosticSummary diagnostic_summaries = 4;
-    bool visible = 5;
-    uint64 scan_id = 6;
-}
-
 message File {
     uint64 worktree_id = 1;
     optional uint64 entry_id = 2;

crates/rpc/src/proto.rs 🔗

@@ -5,6 +5,7 @@ use futures::{SinkExt as _, StreamExt as _};
 use prost::Message as _;
 use serde::Serialize;
 use std::any::{Any, TypeId};
+use std::{cmp, iter, mem};
 use std::{
     fmt::Debug,
     io,
@@ -390,6 +391,31 @@ impl From<Nonce> for u128 {
     }
 }
 
+pub fn split_worktree_update(
+    mut message: UpdateWorktree,
+    max_chunk_size: usize,
+) -> impl Iterator<Item = UpdateWorktree> {
+    let mut done = false;
+    iter::from_fn(move || {
+        if done {
+            return None;
+        }
+
+        let chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size);
+        let updated_entries = message.updated_entries.drain(..chunk_size).collect();
+        done = message.updated_entries.is_empty();
+        Some(UpdateWorktree {
+            project_id: message.project_id,
+            worktree_id: message.worktree_id,
+            root_name: message.root_name.clone(),
+            updated_entries,
+            removed_entries: mem::take(&mut message.removed_entries),
+            scan_id: message.scan_id,
+            is_last_update: done && message.is_last_update,
+        })
+    })
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;