git: Ensure no more than 4 blame processes run concurrently for each multibuffer (#44843)

Cole Miller created

Previously we were only awaiting on up to 4 of the blame futures at a
time, but we would still call `Project::blame_buffer` eagerly for every
buffer in the multibuffer. Since that returns a `Task`, all the blame
invocations were still launched concurrently.

Release Notes:

- N/A

Change summary

crates/editor/src/git/blame.rs | 164 +++++++++++++++++++----------------
1 file changed, 91 insertions(+), 73 deletions(-)

Detailed changes

crates/editor/src/git/blame.rs 🔗

@@ -1,7 +1,7 @@
 use crate::Editor;
-use anyhow::Result;
+use anyhow::{Context as _, Result};
 use collections::HashMap;
-use futures::StreamExt;
+
 use git::{
     GitHostingProviderRegistry, GitRemote, Oid,
     blame::{Blame, BlameEntry, ParsedCommitMessage},
@@ -494,84 +494,102 @@ impl GitBlame {
             self.changed_while_blurred = true;
             return;
         }
-        let blame = self.project.update(cx, |project, cx| {
-            let Some(multi_buffer) = self.multi_buffer.upgrade() else {
-                return Vec::new();
-            };
-            multi_buffer
-                .read(cx)
-                .all_buffer_ids()
-                .into_iter()
-                .filter_map(|id| {
-                    let buffer = multi_buffer.read(cx).buffer(id)?;
-                    let snapshot = buffer.read(cx).snapshot();
-                    let buffer_edits = buffer.update(cx, |buffer, _| buffer.subscribe());
-
-                    let blame_buffer = project.blame_buffer(&buffer, None, cx);
-                    let remote_url = project
-                        .git_store()
-                        .read(cx)
-                        .repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
-                        .and_then(|(repo, _)| {
-                            repo.read(cx)
-                                .remote_upstream_url
-                                .clone()
-                                .or(repo.read(cx).remote_origin_url.clone())
-                        });
-                    Some(
-                        async move { (id, snapshot, buffer_edits, blame_buffer.await, remote_url) },
-                    )
-                })
-                .collect::<Vec<_>>()
-        });
-        let provider_registry = GitHostingProviderRegistry::default_global(cx);
+        let buffers_to_blame = self
+            .multi_buffer
+            .update(cx, |multi_buffer, _| {
+                multi_buffer
+                    .all_buffer_ids()
+                    .into_iter()
+                    .filter_map(|id| Some(multi_buffer.buffer(id)?.downgrade()))
+                    .collect::<Vec<_>>()
+            })
+            .unwrap_or_default();
+        let project = self.project.downgrade();
 
         self.task = cx.spawn(async move |this, cx| {
-            let (result, errors) = cx
-                .background_spawn({
-                    async move {
-                        let blame = futures::stream::iter(blame)
-                            .buffered(4)
-                            .collect::<Vec<_>>()
-                            .await;
-                        let mut res = vec![];
-                        let mut errors = vec![];
-                        for (id, snapshot, buffer_edits, blame, remote_url) in blame {
-                            match blame {
-                                Ok(Some(Blame { entries, messages })) => {
-                                    let entries = build_blame_entry_sum_tree(
-                                        entries,
-                                        snapshot.max_point().row,
-                                    );
-                                    let commit_details = parse_commit_messages(
-                                        messages,
-                                        remote_url,
-                                        provider_registry.clone(),
-                                    )
-                                    .await;
-
-                                    res.push((
+            let mut all_results = Vec::new();
+            let mut all_errors = Vec::new();
+
+            for buffers in buffers_to_blame.chunks(4) {
+                let blame = cx.update(|cx| {
+                    buffers
+                        .iter()
+                        .map(|buffer| {
+                            let buffer = buffer.upgrade().context("buffer was dropped")?;
+                            let project = project.upgrade().context("project was dropped")?;
+                            let id = buffer.read(cx).remote_id();
+                            let snapshot = buffer.read(cx).snapshot();
+                            let buffer_edits = buffer.update(cx, |buffer, _| buffer.subscribe());
+                            let remote_url = project
+                                .read(cx)
+                                .git_store()
+                                .read(cx)
+                                .repository_and_path_for_buffer_id(buffer.read(cx).remote_id(), cx)
+                                .and_then(|(repo, _)| {
+                                    repo.read(cx)
+                                        .remote_upstream_url
+                                        .clone()
+                                        .or(repo.read(cx).remote_origin_url.clone())
+                                });
+                            let blame_buffer = project
+                                .update(cx, |project, cx| project.blame_buffer(&buffer, None, cx));
+                            Ok(async move {
+                                (id, snapshot, buffer_edits, blame_buffer.await, remote_url)
+                            })
+                        })
+                        .collect::<Result<Vec<_>>>()
+                })??;
+                let provider_registry =
+                    cx.update(|cx| GitHostingProviderRegistry::default_global(cx))?;
+                let (results, errors) = cx
+                    .background_spawn({
+                        async move {
+                            let blame = futures::future::join_all(blame).await;
+                            let mut res = vec![];
+                            let mut errors = vec![];
+                            for (id, snapshot, buffer_edits, blame, remote_url) in blame {
+                                match blame {
+                                    Ok(Some(Blame { entries, messages })) => {
+                                        let entries = build_blame_entry_sum_tree(
+                                            entries,
+                                            snapshot.max_point().row,
+                                        );
+                                        let commit_details = parse_commit_messages(
+                                            messages,
+                                            remote_url,
+                                            provider_registry.clone(),
+                                        )
+                                        .await;
+
+                                        res.push((
+                                            id,
+                                            snapshot,
+                                            buffer_edits,
+                                            Some(entries),
+                                            commit_details,
+                                        ));
+                                    }
+                                    Ok(None) => res.push((
                                         id,
                                         snapshot,
                                         buffer_edits,
-                                        Some(entries),
-                                        commit_details,
-                                    ));
-                                }
-                                Ok(None) => {
-                                    res.push((id, snapshot, buffer_edits, None, Default::default()))
+                                        None,
+                                        Default::default(),
+                                    )),
+                                    Err(e) => errors.push(e),
                                 }
-                                Err(e) => errors.push(e),
                             }
+                            (res, errors)
                         }
-                        (res, errors)
-                    }
-                })
-                .await;
+                    })
+                    .await;
+                all_results.extend(results);
+                all_errors.extend(errors)
+            }
 
             this.update(cx, |this, cx| {
                 this.buffers.clear();
-                for (id, snapshot, buffer_edits, entries, commit_details) in result {
+                for (id, snapshot, buffer_edits, entries, commit_details) in all_results {
                     let Some(entries) = entries else {
                         continue;
                     };
@@ -586,11 +604,11 @@ impl GitBlame {
                     );
                 }
                 cx.notify();
-                if !errors.is_empty() {
+                if !all_errors.is_empty() {
                     this.project.update(cx, |_, cx| {
                         if this.user_triggered {
-                            log::error!("failed to get git blame data: {errors:?}");
-                            let notification = errors
+                            log::error!("failed to get git blame data: {all_errors:?}");
+                            let notification = all_errors
                                 .into_iter()
                                 .format_with(",", |e, f| f(&format_args!("{:#}", e)))
                                 .to_string();
@@ -601,7 +619,7 @@ impl GitBlame {
                         } else {
                             // If we weren't triggered by a user, we just log errors in the background, instead of sending
                             // notifications.
-                            log::debug!("failed to get git blame data: {errors:?}");
+                            log::debug!("failed to get git blame data: {all_errors:?}");
                         }
                     })
                 }