Update git repositories to be streamed with their entries

Mikayla Maki and max created

co-authored-by: max <max@zed.dev>

Change summary

crates/rpc/src/proto.rs | 93 +++++++++---------------------------------
1 file changed, 20 insertions(+), 73 deletions(-)

Detailed changes

crates/rpc/src/proto.rs 🔗

@@ -1,6 +1,7 @@
 use super::{entity_messages, messages, request_messages, ConnectionId, TypedEnvelope};
 use anyhow::{anyhow, Result};
 use async_tungstenite::tungstenite::Message as WebSocketMessage;
+use collections::HashMap;
 use futures::{SinkExt as _, StreamExt as _};
 use prost::Message as _;
 use serde::Serialize;
@@ -485,11 +486,15 @@ pub fn split_worktree_update(
     max_chunk_size: usize,
 ) -> impl Iterator<Item = UpdateWorktree> {
     let mut done_files = false;
-    let mut done_statuses = false;
-    let mut repository_index = 0;
-    let mut root_repo_found = false;
+
+    let mut repository_map = message
+        .updated_repositories
+        .into_iter()
+        .map(|repo| (repo.work_directory_id, repo))
+        .collect::<HashMap<_, _>>();
+
     iter::from_fn(move || {
-        if done_files && done_statuses {
+        if done_files {
             return None;
         }
 
@@ -499,25 +504,6 @@ pub fn split_worktree_update(
             .drain(..updated_entries_chunk_size)
             .collect();
 
-        let mut updated_repositories: Vec<_> = Default::default();
-
-        if !root_repo_found {
-            for entry in updated_entries.iter() {
-                if let Some(repo) = message.updated_repositories.get(0) {
-                    if repo.work_directory_id == entry.id {
-                        root_repo_found = true;
-                        updated_repositories.push(RepositoryEntry {
-                            work_directory_id: repo.work_directory_id,
-                            branch: repo.branch.clone(),
-                            removed_repo_paths: Default::default(),
-                            updated_statuses: Default::default(),
-                        });
-                        break;
-                    }
-                }
-            }
-        }
-
         let removed_entries_chunk_size = cmp::min(message.removed_entries.len(), max_chunk_size);
         let removed_entries = message
             .removed_entries
@@ -526,64 +512,25 @@ pub fn split_worktree_update(
 
         done_files = message.updated_entries.is_empty() && message.removed_entries.is_empty();
 
-        // Wait to send repositories until after we've guaranteed that their associated entries
-        // will be read
-        if done_files {
-            let mut total_statuses = 0;
-            while total_statuses < max_chunk_size
-                && repository_index < message.updated_repositories.len()
-            {
-                let updated_statuses_chunk_size = cmp::min(
-                    message.updated_repositories[repository_index]
-                        .updated_statuses
-                        .len(),
-                    max_chunk_size - total_statuses,
-                );
-
-                let updated_statuses: Vec<_> = message.updated_repositories[repository_index]
-                    .updated_statuses
-                    .drain(..updated_statuses_chunk_size)
-                    .collect();
-
-                total_statuses += updated_statuses.len();
-
-                let done_this_repo = message.updated_repositories[repository_index]
-                    .updated_statuses
-                    .is_empty();
-
-                let removed_repo_paths = if done_this_repo {
-                    mem::take(
-                        &mut message.updated_repositories[repository_index].removed_repo_paths,
-                    )
-                } else {
-                    Default::default()
-                };
-
-                updated_repositories.push(RepositoryEntry {
-                    work_directory_id: message.updated_repositories[repository_index]
-                        .work_directory_id,
-                    branch: message.updated_repositories[repository_index]
-                        .branch
-                        .clone(),
-                    updated_statuses,
-                    removed_repo_paths,
-                });
-
-                if done_this_repo {
-                    repository_index += 1;
+        let mut updated_repositories = Vec::new();
+
+        if !repository_map.is_empty() {
+            for entry in &updated_entries {
+                if let Some(repo) = repository_map.remove(&entry.id) {
+                    updated_repositories.push(repo)
                 }
             }
-        } else {
-            Default::default()
-        };
+        }
 
-        let removed_repositories = if done_files && done_statuses {
+        let removed_repositories = if done_files {
             mem::take(&mut message.removed_repositories)
         } else {
             Default::default()
         };
 
-        done_statuses = repository_index >= message.updated_repositories.len();
+        if done_files {
+            updated_repositories.extend(mem::take(&mut repository_map).into_values());
+        }
 
         Some(UpdateWorktree {
             project_id: message.project_id,