Split worktree updates and only send 256 entries at a time

Antonio Scandurra created

Change summary

crates/collab/src/rpc.rs       |   2 
crates/collab/src/rpc/store.rs |   3 
crates/project/src/project.rs  |   1 
crates/project/src/worktree.rs | 150 +++++++++++++++++++----------------
crates/rpc/proto/zed.proto     |   2 
5 files changed, 88 insertions(+), 70 deletions(-)

Detailed changes

crates/collab/src/rpc.rs 🔗

@@ -804,6 +804,7 @@ impl Server {
                             .collect(),
                         visible: worktree.visible,
                         scan_id: shared_worktree.scan_id,
+                        is_complete: worktree.is_complete,
                     })
                 })
                 .collect::<Vec<_>>();
@@ -963,6 +964,7 @@ impl Server {
                 &request.payload.removed_entries,
                 &request.payload.updated_entries,
                 request.payload.scan_id,
+                request.payload.is_last_update,
             )?;
             (connection_ids, metadata_changed, extension_counts.clone())
         };

crates/collab/src/rpc/store.rs 🔗

@@ -62,6 +62,7 @@ pub struct Worktree {
     #[serde(skip)]
     pub diagnostic_summaries: BTreeMap<PathBuf, proto::DiagnosticSummary>,
     pub scan_id: u64,
+    pub is_complete: bool,
 }
 
 #[derive(Default)]
@@ -615,6 +616,7 @@ impl Store {
         removed_entries: &[u64],
         updated_entries: &[proto::Entry],
         scan_id: u64,
+        is_last_update: bool,
     ) -> Result<(Vec<ConnectionId>, bool, HashMap<String, usize>)> {
         let project = self.write_project(project_id, connection_id)?;
         let connection_ids = project.connection_ids();
@@ -657,6 +659,7 @@ impl Store {
         }
 
         worktree.scan_id = scan_id;
+        worktree.is_complete = is_last_update;
         Ok((
             connection_ids,
             metadata_changed,

crates/project/src/project.rs 🔗

@@ -4447,6 +4447,7 @@ impl Project {
                         diagnostic_summaries: Default::default(),
                         visible: worktree.visible,
                         scan_id: 0,
+                        is_complete: false,
                     };
                     let (worktree, load_task) =
                         Worktree::remote(remote_id, replica_id, worktree, client.clone(), cx);

crates/project/src/worktree.rs 🔗

@@ -83,7 +83,7 @@ pub struct RemoteWorktree {
     project_id: u64,
     client: Arc<Client>,
     updates_tx: Option<UnboundedSender<proto::UpdateWorktree>>,
-    last_scan_id_rx: watch::Receiver<usize>,
+    snapshot_updated_rx: watch::Receiver<()>,
     replica_id: ReplicaId,
     diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
     visible: bool,
@@ -97,6 +97,7 @@ pub struct Snapshot {
     entries_by_path: SumTree<Entry>,
     entries_by_id: SumTree<PathEntry>,
     scan_id: usize,
+    is_complete: bool,
 }
 
 #[derive(Clone)]
@@ -191,12 +192,12 @@ impl Worktree {
             entries_by_path: Default::default(),
             entries_by_id: Default::default(),
             scan_id: worktree.scan_id as usize,
+            is_complete: worktree.is_complete,
         };
 
         let (updates_tx, mut updates_rx) = mpsc::unbounded();
         let background_snapshot = Arc::new(Mutex::new(snapshot.clone()));
         let (mut snapshot_updated_tx, mut snapshot_updated_rx) = watch::channel();
-        let (mut last_scan_id_tx, last_scan_id_rx) = watch::channel_with(worktree.scan_id as usize);
         let worktree_handle = cx.add_model(|_: &mut ModelContext<Worktree>| {
             Worktree::Remote(RemoteWorktree {
                 project_id: project_remote_id,
@@ -204,7 +205,7 @@ impl Worktree {
                 snapshot: snapshot.clone(),
                 background_snapshot: background_snapshot.clone(),
                 updates_tx: Some(updates_tx),
-                last_scan_id_rx,
+                snapshot_updated_rx: snapshot_updated_rx.clone(),
                 client: client.clone(),
                 diagnostic_summaries: TreeMap::from_ordered_entries(
                     worktree.diagnostic_summaries.into_iter().map(|summary| {
@@ -279,11 +280,7 @@ impl Worktree {
                     async move {
                         while let Some(_) = snapshot_updated_rx.recv().await {
                             if let Some(this) = this.upgrade(&cx) {
-                                this.update(&mut cx, |this, cx| {
-                                    this.poll_snapshot(cx);
-                                    let this = this.as_remote_mut().unwrap();
-                                    *last_scan_id_tx.borrow_mut() = this.snapshot.scan_id;
-                                });
+                                this.update(&mut cx, |this, cx| this.poll_snapshot(cx));
                             } else {
                                 break;
                             }
@@ -450,6 +447,7 @@ impl LocalWorktree {
                     entries_by_path: Default::default(),
                     entries_by_id: Default::default(),
                     scan_id: 0,
+                    is_complete: true,
                 },
             };
             if let Some(metadata) = metadata {
@@ -910,22 +908,20 @@ impl LocalWorktree {
                 async move {
                     let mut prev_snapshot = match snapshots_to_send_rx.recv().await {
                         Ok(snapshot) => {
-                            if let Err(error) = rpc
-                                .request(proto::UpdateWorktree {
-                                    project_id,
-                                    worktree_id,
-                                    root_name: snapshot.root_name().to_string(),
-                                    updated_entries: snapshot
-                                        .entries_by_path
-                                        .iter()
-                                        .filter(|e| !e.is_ignored)
-                                        .map(Into::into)
-                                        .collect(),
-                                    removed_entries: Default::default(),
-                                    scan_id: snapshot.scan_id as u64,
-                                })
-                                .await
-                            {
+                            let update = proto::UpdateWorktree {
+                                project_id,
+                                worktree_id,
+                                root_name: snapshot.root_name().to_string(),
+                                updated_entries: snapshot
+                                    .entries_by_path
+                                    .iter()
+                                    .map(Into::into)
+                                    .collect(),
+                                removed_entries: Default::default(),
+                                scan_id: snapshot.scan_id as u64,
+                                is_last_update: true,
+                            };
+                            if let Err(error) = send_worktree_update(&rpc, update).await {
                                 let _ = share_tx.send(Err(error));
                                 return Err(anyhow!("failed to send initial update worktree"));
                             } else {
@@ -947,48 +943,16 @@ impl LocalWorktree {
                         })?;
                     }
 
-                    // Stream ignored entries in chunks.
-                    {
-                        let mut ignored_entries = prev_snapshot
-                            .entries_by_path
-                            .iter()
-                            .filter(|e| e.is_ignored);
-                        let mut ignored_entries_to_send = Vec::new();
-                        loop {
-                            #[cfg(any(test, feature = "test-support"))]
-                            const CHUNK_SIZE: usize = 2;
-                            #[cfg(not(any(test, feature = "test-support")))]
-                            const CHUNK_SIZE: usize = 256;
-
-                            let entry = ignored_entries.next();
-                            if ignored_entries_to_send.len() >= CHUNK_SIZE || entry.is_none() {
-                                rpc.request(proto::UpdateWorktree {
-                                    project_id,
-                                    worktree_id,
-                                    root_name: prev_snapshot.root_name().to_string(),
-                                    updated_entries: mem::take(&mut ignored_entries_to_send),
-                                    removed_entries: Default::default(),
-                                    scan_id: prev_snapshot.scan_id as u64,
-                                })
-                                .await?;
-                            }
-
-                            if let Some(entry) = entry {
-                                ignored_entries_to_send.push(entry.into());
-                            } else {
-                                break;
-                            }
-                        }
-                    }
-
                     while let Ok(mut snapshot) = snapshots_to_send_rx.recv().await {
                         while let Ok(newer_snapshot) = snapshots_to_send_rx.try_recv() {
                             snapshot = newer_snapshot;
                         }
 
-                        let message =
-                            snapshot.build_update(&prev_snapshot, project_id, worktree_id, true);
-                        rpc.request(message).await?;
+                        send_worktree_update(
+                            &rpc,
+                            snapshot.build_update(&prev_snapshot, project_id, worktree_id, true),
+                        )
+                        .await?;
                         prev_snapshot = snapshot;
                     }
 
@@ -1063,15 +1027,25 @@ impl RemoteWorktree {
         Ok(())
     }
 
-    fn wait_for_snapshot(&self, scan_id: usize) -> impl Future<Output = ()> {
-        let mut rx = self.last_scan_id_rx.clone();
-        async move {
-            while let Some(applied_scan_id) = rx.next().await {
-                if applied_scan_id >= scan_id {
-                    return;
+    fn wait_for_snapshot(
+        &self,
+        scan_id: usize,
+        cx: &mut ModelContext<Worktree>,
+    ) -> Task<Option<()>> {
+        let mut rx = self.snapshot_updated_rx.clone();
+        cx.spawn_weak(|worktree, cx| async move {
+            while rx.recv().await.is_some() {
+                let snapshot = worktree
+                    .upgrade(&cx)?
+                    .read_with(&cx, |worktree, _| worktree.snapshot());
+                if snapshot.scan_id > scan_id
+                    || (snapshot.scan_id == scan_id && snapshot.is_complete)
+                {
+                    break;
                 }
             }
-        }
+            None
+        })
     }
 
     pub fn update_diagnostic_summary(
@@ -1098,7 +1072,7 @@ impl RemoteWorktree {
         scan_id: usize,
         cx: &mut ModelContext<Worktree>,
     ) -> Task<Result<Entry>> {
-        let wait_for_snapshot = self.wait_for_snapshot(scan_id);
+        let wait_for_snapshot = self.wait_for_snapshot(scan_id, cx);
         cx.spawn(|this, mut cx| async move {
             wait_for_snapshot.await;
             this.update(&mut cx, |worktree, _| {
@@ -1117,7 +1091,7 @@ impl RemoteWorktree {
         scan_id: usize,
         cx: &mut ModelContext<Worktree>,
     ) -> Task<Result<()>> {
-        let wait_for_snapshot = self.wait_for_snapshot(scan_id);
+        let wait_for_snapshot = self.wait_for_snapshot(scan_id, cx);
         cx.spawn(|this, mut cx| async move {
             wait_for_snapshot.await;
             this.update(&mut cx, |worktree, _| {
@@ -1210,6 +1184,7 @@ impl Snapshot {
         self.entries_by_path.edit(entries_by_path_edits, &());
         self.entries_by_id.edit(entries_by_id_edits, &());
         self.scan_id = update.scan_id as usize;
+        self.is_complete = update.is_last_update;
 
         Ok(())
     }
@@ -1352,6 +1327,7 @@ impl LocalSnapshot {
                 .collect(),
             visible,
             scan_id: self.scan_id as u64,
+            is_complete: true,
         }
     }
 
@@ -1418,6 +1394,7 @@ impl LocalSnapshot {
             updated_entries,
             removed_entries,
             scan_id: self.scan_id as u64,
+            is_last_update: true,
         }
     }
 
@@ -2732,6 +2709,38 @@ impl<'a> TryFrom<(&'a CharBag, proto::Entry)> for Entry {
     }
 }
 
+async fn send_worktree_update(
+    client: &Arc<Client>,
+    mut update: proto::UpdateWorktree,
+) -> Result<()> {
+    #[cfg(any(test, feature = "test-support"))]
+    const MAX_CHUNK_SIZE: usize = 2;
+    #[cfg(not(any(test, feature = "test-support")))]
+    const MAX_CHUNK_SIZE: usize = 256;
+
+    loop {
+        let chunk_size = cmp::min(update.updated_entries.len(), MAX_CHUNK_SIZE);
+        let updated_entries = update.updated_entries.drain(..chunk_size).collect();
+        let is_last_update = update.updated_entries.is_empty();
+        client
+            .request(proto::UpdateWorktree {
+                project_id: update.project_id,
+                worktree_id: update.worktree_id,
+                root_name: update.root_name.clone(),
+                updated_entries,
+                removed_entries: mem::take(&mut update.removed_entries),
+                scan_id: update.scan_id,
+                is_last_update,
+            })
+            .await?;
+        if is_last_update {
+            break;
+        }
+    }
+
+    Ok(())
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -2941,6 +2950,7 @@ mod tests {
                 root_name: Default::default(),
                 root_char_bag: Default::default(),
                 scan_id: 0,
+                is_complete: true,
             },
         };
         initial_snapshot.insert_entry(

crates/rpc/proto/zed.proto 🔗

@@ -195,6 +195,7 @@ message UpdateWorktree {
     repeated Entry updated_entries = 4;
     repeated uint64 removed_entries = 5;
     uint64 scan_id = 6;
+    bool is_last_update = 7;
 }
 
 message CreateProjectEntry {
@@ -772,6 +773,7 @@ message Worktree {
     repeated DiagnosticSummary diagnostic_summaries = 4;
     bool visible = 5;
     uint64 scan_id = 6;
+    bool is_complete = 7;
 }
 
 message File {