From 484af8c7c488ba6180e1852bde89ab344c6c48ae Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 30 Jun 2022 16:49:56 +0200 Subject: [PATCH] Split worktree updates when a peer joins an already-shared project --- crates/collab/src/integration_tests.rs | 1 + crates/collab/src/rpc.rs | 63 +++++++--- crates/project/src/project.rs | 52 ++++---- crates/project/src/worktree.rs | 167 ++++++------------------- crates/rpc/proto/zed.proto | 12 +- crates/rpc/src/proto.rs | 26 ++++ 6 files changed, 137 insertions(+), 184 deletions(-) diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index facef17b63b52192f668fa1aaee0aeb777e7a665..3da1fc86925e8e36fcaeed0c3b17a2d2b1f0cd6d 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -1472,6 +1472,7 @@ async fn test_collaborating_with_diagnostics( // Join project as client C and observe the diagnostics. let project_c = client_c.build_remote_project(&project_a, cx_a, cx_c).await; + deterministic.run_until_parked(); project_c.read_with(cx_c, |project, cx| { assert_eq!( project.diagnostic_summaries(cx).collect::>(), diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index e5a1f4dd1aead4ba3c3a917a9b39b2b4c5dda85c..b3dc965ff3cfb147fd197fe93c2a00191d0b3438 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -791,21 +791,10 @@ impl Server { let worktrees = project .worktrees .iter() - .filter_map(|(id, shared_worktree)| { - let worktree = project.worktrees.get(&id)?; - Some(proto::Worktree { - id: *id, - root_name: worktree.root_name.clone(), - entries: shared_worktree.entries.values().cloned().collect(), - diagnostic_summaries: shared_worktree - .diagnostic_summaries - .values() - .cloned() - .collect(), - visible: worktree.visible, - scan_id: shared_worktree.scan_id, - is_complete: worktree.is_complete, - }) + .map(|(id, worktree)| proto::WorktreeMetadata { + id: *id, + root_name: worktree.root_name.clone(), + visible: worktree.visible, }) .collect::>(); @@ -841,14 +830,15 @@ impl Server { } } - for (receipt, replica_id) in receipts_with_replica_ids { + // First, we send the metadata associated with each worktree. + for (receipt, replica_id) in &receipts_with_replica_ids { self.peer.respond( - receipt, + receipt.clone(), proto::JoinProjectResponse { variant: Some(proto::join_project_response::Variant::Accept( proto::join_project_response::Accept { worktrees: worktrees.clone(), - replica_id: replica_id as u32, + replica_id: *replica_id as u32, collaborators: collaborators.clone(), language_servers: project.language_servers.clone(), }, @@ -856,6 +846,43 @@ impl Server { }, )?; } + + for (worktree_id, worktree) in &project.worktrees { + #[cfg(any(test, feature = "test-support"))] + const MAX_CHUNK_SIZE: usize = 2; + #[cfg(not(any(test, feature = "test-support")))] + const MAX_CHUNK_SIZE: usize = 256; + + // Stream this worktree's entries. + let message = proto::UpdateWorktree { + project_id: project_id.to_proto(), + worktree_id: *worktree_id, + root_name: worktree.root_name.clone(), + updated_entries: worktree.entries.values().cloned().collect(), + removed_entries: Default::default(), + scan_id: worktree.scan_id, + is_last_update: worktree.is_complete, + }; + for update in proto::split_worktree_update(message, MAX_CHUNK_SIZE) { + for (receipt, _) in &receipts_with_replica_ids { + self.peer.send(receipt.sender_id, update.clone())?; + } + } + + // Stream this worktree's diagnostics. + for summary in worktree.diagnostic_summaries.values() { + for (receipt, _) in &receipts_with_replica_ids { + self.peer.send( + receipt.sender_id, + proto::UpdateDiagnosticSummary { + project_id: project_id.to_proto(), + worktree_id: *worktree_id, + summary: Some(summary.clone()), + }, + )?; + } + } + } } self.update_user_contacts(host_user_id).await?; diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 10485b79e4cae56e2394ee28b3d42ff1969a46fe..806498a22402fb6c8b254b6c673600871250b39a 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -487,10 +487,9 @@ impl Project { let mut worktrees = Vec::new(); for worktree in response.worktrees { - let (worktree, load_task) = cx + let worktree = cx .update(|cx| Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx)); worktrees.push(worktree); - load_task.detach(); } let (opened_buffer_tx, opened_buffer_rx) = watch::channel(); @@ -4441,19 +4440,9 @@ impl Project { { this.worktrees.push(WorktreeHandle::Strong(old_worktree)); } else { - let worktree = proto::Worktree { - id: worktree.id, - root_name: worktree.root_name, - entries: Default::default(), - diagnostic_summaries: Default::default(), - visible: worktree.visible, - scan_id: 0, - is_complete: false, - }; - let (worktree, load_task) = + let worktree = Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx); this.add_worktree(&worktree, cx); - load_task.detach(); } } @@ -4477,8 +4466,8 @@ impl Project { if let Some(worktree) = this.worktree_for_id(worktree_id, cx) { worktree.update(cx, |worktree, _| { let worktree = worktree.as_remote_mut().unwrap(); - worktree.update_from_remote(envelope) - })?; + worktree.update_from_remote(envelope.payload); + }); } Ok(()) }) @@ -7996,7 +7985,10 @@ mod tests { } #[gpui::test(retries = 5)] - async fn test_rescan_and_remote_updates(cx: &mut gpui::TestAppContext) { + async fn test_rescan_and_remote_updates( + deterministic: Arc, + cx: &mut gpui::TestAppContext, + ) { let dir = temp_tree(json!({ "a": { "file1": "", @@ -8040,17 +8032,24 @@ mod tests { // Create a remote copy of this worktree. let tree = project.read_with(cx, |project, cx| project.worktrees(cx).next().unwrap()); let initial_snapshot = tree.read_with(cx, |tree, _| tree.as_local().unwrap().snapshot()); - let (remote, load_task) = cx.update(|cx| { + let remote = cx.update(|cx| { Worktree::remote( 1, 1, - initial_snapshot.to_proto(&Default::default(), true), + proto::WorktreeMetadata { + id: initial_snapshot.id().to_proto(), + root_name: initial_snapshot.root_name().into(), + visible: true, + }, rpc.clone(), cx, ) }); - // tree - load_task.await; + remote.update(cx, |remote, _| { + let update = initial_snapshot.build_initial_update(1); + remote.as_remote_mut().unwrap().update_from_remote(update); + }); + deterministic.run_until_parked(); cx.read(|cx| { assert!(!buffer2.read(cx).is_dirty()); @@ -8116,19 +8115,16 @@ mod tests { // Update the remote worktree. Check that it becomes consistent with the // local worktree. remote.update(cx, |remote, cx| { - let update_message = tree.read(cx).as_local().unwrap().snapshot().build_update( + let update = tree.read(cx).as_local().unwrap().snapshot().build_update( &initial_snapshot, 1, 1, true, ); - remote - .as_remote_mut() - .unwrap() - .snapshot - .apply_remote_update(update_message) - .unwrap(); - + remote.as_remote_mut().unwrap().update_from_remote(update); + }); + deterministic.run_until_parked(); + remote.read_with(cx, |remote, _| { assert_eq!( remote .paths() diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 6e183aa127adc253abc1e3e18f110149e19a935a..217a512c71133ba9f401d8cbacbc74deeae9bff2 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -7,7 +7,7 @@ use super::{ }; use ::ignore::gitignore::{Gitignore, GitignoreBuilder}; use anyhow::{anyhow, Context, Result}; -use client::{proto, Client, TypedEnvelope}; +use client::{proto, Client}; use clock::ReplicaId; use collections::HashMap; use futures::{ @@ -40,7 +40,6 @@ use std::{ ffi::{OsStr, OsString}, fmt, future::Future, - mem, ops::{Deref, DerefMut}, os::unix::prelude::{OsStrExt, OsStringExt}, path::{Path, PathBuf}, @@ -173,10 +172,10 @@ impl Worktree { pub fn remote( project_remote_id: u64, replica_id: ReplicaId, - worktree: proto::Worktree, + worktree: proto::WorktreeMetadata, client: Arc, cx: &mut MutableAppContext, - ) -> (ModelHandle, Task<()>) { + ) -> ModelHandle { let remote_id = worktree.id; let root_char_bag: CharBag = worktree .root_name @@ -191,8 +190,8 @@ impl Worktree { root_char_bag, entries_by_path: Default::default(), entries_by_id: Default::default(), - scan_id: worktree.scan_id as usize, - is_complete: worktree.is_complete, + scan_id: 0, + is_complete: false, }; let (updates_tx, mut updates_rx) = mpsc::unbounded(); @@ -207,90 +206,37 @@ impl Worktree { updates_tx: Some(updates_tx), snapshot_updated_rx: snapshot_updated_rx.clone(), client: client.clone(), - diagnostic_summaries: TreeMap::from_ordered_entries( - worktree.diagnostic_summaries.into_iter().map(|summary| { - ( - PathKey(PathBuf::from(summary.path).into()), - DiagnosticSummary { - language_server_id: summary.language_server_id as usize, - error_count: summary.error_count as usize, - warning_count: summary.warning_count as usize, - }, - ) - }), - ), + diagnostic_summaries: Default::default(), visible, }) }); - let deserialize_task = cx.spawn({ - let worktree_handle = worktree_handle.clone(); - |cx| async move { - let (entries_by_path, entries_by_id) = cx - .background() - .spawn(async move { - let mut entries_by_path_edits = Vec::new(); - let mut entries_by_id_edits = Vec::new(); - for entry in worktree.entries { - match Entry::try_from((&root_char_bag, entry)) { - Ok(entry) => { - entries_by_id_edits.push(Edit::Insert(PathEntry { - id: entry.id, - path: entry.path.clone(), - is_ignored: entry.is_ignored, - scan_id: 0, - })); - entries_by_path_edits.push(Edit::Insert(entry)); - } - Err(err) => log::warn!("error for remote worktree entry {:?}", err), - } - } - - let mut entries_by_path = SumTree::new(); - let mut entries_by_id = SumTree::new(); - entries_by_path.edit(entries_by_path_edits, &()); - entries_by_id.edit(entries_by_id_edits, &()); - - (entries_by_path, entries_by_id) - }) - .await; - - { - let mut snapshot = background_snapshot.lock(); - snapshot.entries_by_path = entries_by_path; - snapshot.entries_by_id = entries_by_id; + cx.background() + .spawn(async move { + while let Some(update) = updates_rx.next().await { + if let Err(error) = background_snapshot.lock().apply_remote_update(update) { + log::error!("error applying worktree update: {}", error); + } snapshot_updated_tx.send(()).await.ok(); } + }) + .detach(); - cx.background() - .spawn(async move { - while let Some(update) = updates_rx.next().await { - if let Err(error) = - background_snapshot.lock().apply_remote_update(update) - { - log::error!("error applying worktree update: {}", error); - } - snapshot_updated_tx.send(()).await.ok(); - } - }) - .detach(); - - cx.spawn(|mut cx| { - let this = worktree_handle.downgrade(); - async move { - while let Some(_) = snapshot_updated_rx.recv().await { - if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); - } else { - break; - } - } + cx.spawn(|mut cx| { + let this = worktree_handle.downgrade(); + async move { + while let Some(_) = snapshot_updated_rx.recv().await { + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); + } else { + break; } - }) - .detach(); + } } - }); - (worktree_handle, deserialize_task) + }) + .detach(); + + worktree_handle } pub fn as_local(&self) -> Option<&LocalWorktree> { @@ -1015,16 +961,12 @@ impl RemoteWorktree { self.updates_tx.take(); } - pub fn update_from_remote( - &mut self, - envelope: TypedEnvelope, - ) -> Result<()> { + pub fn update_from_remote(&mut self, update: proto::UpdateWorktree) { if let Some(updates_tx) = &self.updates_tx { updates_tx - .unbounded_send(envelope.payload) + .unbounded_send(update) .expect("consumer runs to completion"); } - Ok(()) } fn wait_for_snapshot( @@ -1162,7 +1104,7 @@ impl Snapshot { for entry_id in update.removed_entries { let entry = self .entry_for_id(ProjectEntryId::from_proto(entry_id)) - .ok_or_else(|| anyhow!("unknown entry"))?; + .ok_or_else(|| anyhow!("unknown entry {}", entry_id))?; entries_by_path_edits.push(Edit::Remove(PathKey(entry.path.clone()))); entries_by_id_edits.push(Edit::Remove(entry.id)); } @@ -1306,28 +1248,16 @@ impl LocalSnapshot { } #[cfg(test)] - pub(crate) fn to_proto( - &self, - diagnostic_summaries: &TreeMap, - visible: bool, - ) -> proto::Worktree { + pub(crate) fn build_initial_update(&self, project_id: u64) -> proto::UpdateWorktree { let root_name = self.root_name.clone(); - proto::Worktree { - id: self.id.0 as u64, + proto::UpdateWorktree { + project_id, + worktree_id: self.id().to_proto(), root_name, - entries: self - .entries_by_path - .iter() - .filter(|e| !e.is_ignored) - .map(Into::into) - .collect(), - diagnostic_summaries: diagnostic_summaries - .iter() - .map(|(path, summary)| summary.to_proto(&path.0)) - .collect(), - visible, + updated_entries: self.entries_by_path.iter().map(Into::into).collect(), + removed_entries: Default::default(), scan_id: self.scan_id as u64, - is_complete: true, + is_last_update: true, } } @@ -2709,31 +2639,14 @@ impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry { } } -async fn send_worktree_update( - client: &Arc, - mut update: proto::UpdateWorktree, -) -> Result<()> { +async fn send_worktree_update(client: &Arc, update: proto::UpdateWorktree) -> Result<()> { #[cfg(any(test, feature = "test-support"))] const MAX_CHUNK_SIZE: usize = 2; #[cfg(not(any(test, feature = "test-support")))] const MAX_CHUNK_SIZE: usize = 256; - let mut is_last_update = false; - while !is_last_update { - let chunk_size = cmp::min(update.updated_entries.len(), MAX_CHUNK_SIZE); - let updated_entries = update.updated_entries.drain(..chunk_size).collect(); - is_last_update = update.updated_entries.is_empty(); - client - .request(proto::UpdateWorktree { - project_id: update.project_id, - worktree_id: update.worktree_id, - root_name: update.root_name.clone(), - updated_entries, - removed_entries: mem::take(&mut update.removed_entries), - scan_id: update.scan_id, - is_last_update, - }) - .await?; + for update in proto::split_worktree_update(update, MAX_CHUNK_SIZE) { + client.request(update).await?; } Ok(()) diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 8291b8ac985477f77286e77f8e88f7c77a5c1fae..e3ca60c2519980d8771d5205517962c904ebbc72 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -168,7 +168,7 @@ message JoinProjectResponse { message Accept { uint32 replica_id = 1; - repeated Worktree worktrees = 2; + repeated WorktreeMetadata worktrees = 2; repeated Collaborator collaborators = 3; repeated LanguageServer language_servers = 4; } @@ -766,16 +766,6 @@ message User { string avatar_url = 3; } -message Worktree { - uint64 id = 1; - string root_name = 2; - repeated Entry entries = 3; - repeated DiagnosticSummary diagnostic_summaries = 4; - bool visible = 5; - uint64 scan_id = 6; - bool is_complete = 7; -} - message File { uint64 worktree_id = 1; optional uint64 entry_id = 2; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index ecee3709863b4239dcd43b3c2e9c21eb4868880c..6200f37cd2da253cb511241c9eed32a2819b50e6 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -5,6 +5,7 @@ use futures::{SinkExt as _, StreamExt as _}; use prost::Message as _; use serde::Serialize; use std::any::{Any, TypeId}; +use std::{cmp, iter, mem}; use std::{ fmt::Debug, io, @@ -390,6 +391,31 @@ impl From for u128 { } } +pub fn split_worktree_update( + mut message: UpdateWorktree, + max_chunk_size: usize, +) -> impl Iterator { + let mut done = false; + iter::from_fn(move || { + if done { + return None; + } + + let chunk_size = cmp::min(message.updated_entries.len(), max_chunk_size); + let updated_entries = message.updated_entries.drain(..chunk_size).collect(); + done = message.updated_entries.is_empty(); + Some(UpdateWorktree { + project_id: message.project_id, + worktree_id: message.worktree_id, + root_name: message.root_name.clone(), + updated_entries, + removed_entries: mem::take(&mut message.removed_entries), + scan_id: message.scan_id, + is_last_update: done && message.is_last_update, + }) + }) +} + #[cfg(test)] mod tests { use super::*;