Fix race condition in diff base initializaiton (#2513)

Mikayla Maki created

fixes
https://linear.app/zed-industries/issue/Z-1657/diff-markers-in-gutter-do-not-show-up-until-after-first-save

Release Notes:

- Fixes a race condition on buffer initialization that would cause git
diffs to not load.

Change summary

crates/collab/src/tests/integration_tests.rs |   1 
crates/project/src/project.rs                | 120 ++++++++++++++++-----
crates/project/src/worktree.rs               |   1 
3 files changed, 93 insertions(+), 29 deletions(-)

Detailed changes

crates/collab/src/tests/integration_tests.rs 🔗

@@ -2685,6 +2685,7 @@ async fn test_git_branch_name(
     });
 
     let project_remote_c = client_c.build_remote_project(project_id, cx_c).await;
+    deterministic.run_until_parked();
     project_remote_c.read_with(cx_c, |project, cx| {
         assert_branch(Some("branch-2"), project, cx)
     });

crates/project/src/project.rs 🔗

@@ -15,6 +15,7 @@ use copilot::Copilot;
 use futures::{
     channel::mpsc::{self, UnboundedReceiver},
     future::{try_join_all, Shared},
+    stream::FuturesUnordered,
     AsyncWriteExt, Future, FutureExt, StreamExt, TryFutureExt,
 };
 use globset::{Glob, GlobSet, GlobSetBuilder};
@@ -1361,7 +1362,7 @@ impl Project {
             return Task::ready(Ok(existing_buffer));
         }
 
-        let mut loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
+        let loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) {
             // If the given path is already being loaded, then wait for that existing
             // task to complete and return the same buffer.
             hash_map::Entry::Occupied(e) => e.get().clone(),
@@ -1392,15 +1393,9 @@ impl Project {
         };
 
         cx.foreground().spawn(async move {
-            loop {
-                if let Some(result) = loading_watch.borrow().as_ref() {
-                    match result {
-                        Ok(buffer) => return Ok(buffer.clone()),
-                        Err(error) => return Err(anyhow!("{}", error)),
-                    }
-                }
-                loading_watch.next().await;
-            }
+            pump_loading_buffer_reciever(loading_watch)
+                .await
+                .map_err(|error| anyhow!("{}", error))
         })
     }
 
@@ -4800,6 +4795,51 @@ impl Project {
     ) {
         debug_assert!(worktree_handle.read(cx).is_local());
 
+        // Setup the pending buffers
+        let future_buffers = self
+            .loading_buffers_by_path
+            .iter()
+            .filter_map(|(path, receiver)| {
+                let path = &path.path;
+                let (work_directory, repo) = repos
+                    .iter()
+                    .find(|(work_directory, _)| path.starts_with(work_directory))?;
+
+                let repo_relative_path = path.strip_prefix(work_directory).log_err()?;
+
+                let receiver = receiver.clone();
+                let repo_ptr = repo.repo_ptr.clone();
+                let repo_relative_path = repo_relative_path.to_owned();
+                Some(async move {
+                    pump_loading_buffer_reciever(receiver)
+                        .await
+                        .ok()
+                        .map(|buffer| (buffer, repo_relative_path, repo_ptr))
+                })
+            })
+            .collect::<FuturesUnordered<_>>()
+            .filter_map(|result| async move {
+                let (buffer_handle, repo_relative_path, repo_ptr) = result?;
+
+                let lock = repo_ptr.lock();
+                lock.load_index_text(&repo_relative_path)
+                    .map(|diff_base| (diff_base, buffer_handle))
+            });
+
+        let update_diff_base_fn = update_diff_base(self);
+        cx.spawn(|_, mut cx| async move {
+            let diff_base_tasks = cx
+                .background()
+                .spawn(future_buffers.collect::<Vec<_>>())
+                .await;
+
+            for (diff_base, buffer) in diff_base_tasks.into_iter() {
+                update_diff_base_fn(Some(diff_base), buffer, &mut cx);
+            }
+        })
+        .detach();
+
+        // And the current buffers
         for (_, buffer) in &self.opened_buffers {
             if let Some(buffer) = buffer.upgrade(cx) {
                 let file = match File::from_dyn(buffer.read(cx).file()) {
@@ -4819,18 +4859,17 @@ impl Project {
                     .find(|(work_directory, _)| path.starts_with(work_directory))
                 {
                     Some(repo) => repo.clone(),
-                    None => return,
+                    None => continue,
                 };
 
                 let relative_repo = match path.strip_prefix(work_directory).log_err() {
                     Some(relative_repo) => relative_repo.to_owned(),
-                    None => return,
+                    None => continue,
                 };
 
                 drop(worktree);
 
-                let remote_id = self.remote_id();
-                let client = self.client.clone();
+                let update_diff_base_fn = update_diff_base(self);
                 let git_ptr = repo.repo_ptr.clone();
                 let diff_base_task = cx
                     .background()
@@ -4838,21 +4877,7 @@ impl Project {
 
                 cx.spawn(|_, mut cx| async move {
                     let diff_base = diff_base_task.await;
-
-                    let buffer_id = buffer.update(&mut cx, |buffer, cx| {
-                        buffer.set_diff_base(diff_base.clone(), cx);
-                        buffer.remote_id()
-                    });
-
-                    if let Some(project_id) = remote_id {
-                        client
-                            .send(proto::UpdateDiffBase {
-                                project_id,
-                                buffer_id: buffer_id as u64,
-                                diff_base,
-                            })
-                            .log_err();
-                    }
+                    update_diff_base_fn(diff_base, buffer, &mut cx);
                 })
                 .detach();
             }
@@ -6739,3 +6764,40 @@ impl Item for Buffer {
         })
     }
 }
+
+async fn pump_loading_buffer_reciever(
+    mut receiver: postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
+) -> Result<ModelHandle<Buffer>, Arc<anyhow::Error>> {
+    loop {
+        if let Some(result) = receiver.borrow().as_ref() {
+            match result {
+                Ok(buffer) => return Ok(buffer.to_owned()),
+                Err(e) => return Err(e.to_owned()),
+            }
+        }
+        receiver.next().await;
+    }
+}
+
+fn update_diff_base(
+    project: &Project,
+) -> impl Fn(Option<String>, ModelHandle<Buffer>, &mut AsyncAppContext) {
+    let remote_id = project.remote_id();
+    let client = project.client().clone();
+    move |diff_base, buffer, cx| {
+        let buffer_id = buffer.update(cx, |buffer, cx| {
+            buffer.set_diff_base(diff_base.clone(), cx);
+            buffer.remote_id()
+        });
+
+        if let Some(project_id) = remote_id {
+            client
+                .send(proto::UpdateDiffBase {
+                    project_id,
+                    buffer_id: buffer_id as u64,
+                    diff_base,
+                })
+                .log_err();
+        }
+    }
+}

crates/project/src/worktree.rs 🔗

@@ -800,6 +800,7 @@ impl LocalWorktree {
     fn set_snapshot(&mut self, new_snapshot: LocalSnapshot, cx: &mut ModelContext<Worktree>) {
         let updated_repos =
             self.changed_repos(&self.git_repositories, &new_snapshot.git_repositories);
+
         self.snapshot = new_snapshot;
 
         if let Some(share) = self.share.as_mut() {