Wait for buffer if it doesn't exist when deserializing a reference

Nathan Sobo and Max Brunsfeld created

Co-Authored-By: Max Brunsfeld <maxbrunsfeld@gmail.com>

Change summary

crates/project/src/project.rs | 150 +++++++++++++++++++++---------------
crates/server/src/rpc.rs      |   2 
2 files changed, 89 insertions(+), 63 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -10,7 +10,7 @@ use futures::Future;
 use fuzzy::{PathMatch, PathMatchCandidate, PathMatchCandidateSet};
 use gpui::{
     AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task,
-    WeakModelHandle,
+    UpgradeModelHandle, WeakModelHandle,
 };
 use language::{
     point_from_lsp,
@@ -20,7 +20,7 @@ use language::{
     ToLspPosition, ToOffset, ToPointUtf16, Transaction,
 };
 use lsp::{DiagnosticSeverity, LanguageServer};
-use postage::{prelude::Stream, watch};
+use postage::{broadcast, prelude::Stream, sink::Sink, watch};
 use smol::block_on;
 use std::{
     convert::TryInto,
@@ -47,6 +47,7 @@ pub struct Project {
     subscriptions: Vec<client::Subscription>,
     language_servers_with_diagnostics_running: isize,
     open_buffers: HashMap<usize, OpenBuffer>,
+    opened_buffer: broadcast::Sender<()>,
     loading_buffers: HashMap<
         ProjectPath,
         postage::watch::Receiver<Option<Result<ModelHandle<Buffer>, Arc<anyhow::Error>>>>,
@@ -221,6 +222,7 @@ impl Project {
                     remote_id_rx,
                     _maintain_remote_id_task,
                 },
+                opened_buffer: broadcast::channel(1).0,
                 subscriptions: Vec::new(),
                 active_entry: None,
                 languages,
@@ -278,6 +280,7 @@ impl Project {
                 worktrees: Vec::new(),
                 open_buffers: Default::default(),
                 loading_buffers: Default::default(),
+                opened_buffer: broadcast::channel(1).0,
                 shared_buffers: Default::default(),
                 active_entry: None,
                 collaborators,
@@ -631,6 +634,7 @@ impl Project {
                 .await?;
             let buffer = response.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
             this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
+                .await
         })
     }
 
@@ -1271,29 +1275,27 @@ impl Project {
             };
             cx.spawn(|this, mut cx| async move {
                 let response = client.request(request).await?;
-                this.update(&mut cx, |this, cx| {
-                    let mut definitions = Vec::new();
-                    for definition in response.definitions {
-                        let target_buffer = this.deserialize_buffer(
-                            definition.buffer.ok_or_else(|| anyhow!("missing buffer"))?,
-                            cx,
-                        )?;
-                        let target_start = definition
-                            .target_start
-                            .and_then(deserialize_anchor)
-                            .ok_or_else(|| anyhow!("missing target start"))?;
-                        let target_end = definition
-                            .target_end
-                            .and_then(deserialize_anchor)
-                            .ok_or_else(|| anyhow!("missing target end"))?;
-                        definitions.push(Definition {
-                            target_buffer,
-                            target_range: target_start..target_end,
-                        })
-                    }
+                let mut definitions = Vec::new();
+                for definition in response.definitions {
+                    let buffer = definition.buffer.ok_or_else(|| anyhow!("missing buffer"))?;
+                    let target_buffer = this
+                        .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
+                        .await?;
+                    let target_start = definition
+                        .target_start
+                        .and_then(deserialize_anchor)
+                        .ok_or_else(|| anyhow!("missing target start"))?;
+                    let target_end = definition
+                        .target_end
+                        .and_then(deserialize_anchor)
+                        .ok_or_else(|| anyhow!("missing target end"))?;
+                    definitions.push(Definition {
+                        target_buffer,
+                        target_range: target_start..target_end,
+                    })
+                }
 
-                    Ok(definitions)
-                })
+                Ok(definitions)
             })
         } else {
             Task::ready(Ok(Default::default()))
@@ -2531,20 +2533,15 @@ impl Project {
         push_to_history: bool,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<ProjectTransaction>> {
-        let mut project_transaction = ProjectTransaction::default();
-        for (buffer, transaction) in message.buffers.into_iter().zip(message.transactions) {
-            let buffer = match self.deserialize_buffer(buffer, cx) {
-                Ok(buffer) => buffer,
-                Err(error) => return Task::ready(Err(error)),
-            };
-            let transaction = match language::proto::deserialize_transaction(transaction) {
-                Ok(transaction) => transaction,
-                Err(error) => return Task::ready(Err(error)),
-            };
-            project_transaction.0.insert(buffer, transaction);
-        }
-
-        cx.spawn_weak(|_, mut cx| async move {
+        cx.spawn(|this, mut cx| async move {
+            let mut project_transaction = ProjectTransaction::default();
+            for (buffer, transaction) in message.buffers.into_iter().zip(message.transactions) {
+                let buffer = this
+                    .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx))
+                    .await?;
+                let transaction = language::proto::deserialize_transaction(transaction)?;
+                project_transaction.0.insert(buffer, transaction);
+            }
             for (buffer, transaction) in &project_transaction.0 {
                 buffer
                     .update(&mut cx, |buffer, _| {
@@ -2588,33 +2585,60 @@ impl Project {
         &mut self,
         buffer: proto::Buffer,
         cx: &mut ModelContext<Self>,
-    ) -> Result<ModelHandle<Buffer>> {
-        match buffer.variant.ok_or_else(|| anyhow!("missing buffer"))? {
-            proto::buffer::Variant::Id(id) => self
-                .open_buffers
-                .get(&(id as usize))
-                .and_then(|buffer| buffer.upgrade(cx))
-                .ok_or_else(|| anyhow!("no buffer exists for id {}", id)),
-            proto::buffer::Variant::State(mut buffer) => {
-                let mut buffer_worktree = None;
-                let mut buffer_file = None;
-                if let Some(file) = buffer.file.take() {
-                    let worktree_id = WorktreeId::from_proto(file.worktree_id);
-                    let worktree = self
-                        .worktree_for_id(worktree_id, cx)
-                        .ok_or_else(|| anyhow!("no worktree found for id {}", file.worktree_id))?;
-                    buffer_file = Some(Box::new(File::from_proto(file, worktree.clone(), cx)?)
-                        as Box<dyn language::File>);
-                    buffer_worktree = Some(worktree);
+    ) -> Task<Result<ModelHandle<Buffer>>> {
+        let replica_id = self.replica_id();
+
+        let mut opened_buffer_tx = self.opened_buffer.clone();
+        let mut opened_buffer_rx = self.opened_buffer.subscribe();
+        cx.spawn(|this, mut cx| async move {
+            match buffer.variant.ok_or_else(|| anyhow!("missing buffer"))? {
+                proto::buffer::Variant::Id(id) => {
+                    let buffer = loop {
+                        let buffer = this.read_with(&cx, |this, cx| {
+                            this.open_buffers
+                                .get(&(id as usize))
+                                .and_then(|buffer| buffer.upgrade(cx))
+                        });
+                        if let Some(buffer) = buffer {
+                            break buffer;
+                        }
+                        opened_buffer_rx
+                            .recv()
+                            .await
+                            .ok_or_else(|| anyhow!("project dropped while waiting for buffer"))?;
+                    };
+                    Ok(buffer)
                 }
+                proto::buffer::Variant::State(mut buffer) => {
+                    let mut buffer_worktree = None;
+                    let mut buffer_file = None;
+                    if let Some(file) = buffer.file.take() {
+                        this.read_with(&cx, |this, cx| {
+                            let worktree_id = WorktreeId::from_proto(file.worktree_id);
+                            let worktree =
+                                this.worktree_for_id(worktree_id, cx).ok_or_else(|| {
+                                    anyhow!("no worktree found for id {}", file.worktree_id)
+                                })?;
+                            buffer_file =
+                                Some(Box::new(File::from_proto(file, worktree.clone(), cx)?)
+                                    as Box<dyn language::File>);
+                            buffer_worktree = Some(worktree);
+                            Ok::<_, anyhow::Error>(())
+                        })?;
+                    }
 
-                let buffer = cx.add_model(|cx| {
-                    Buffer::from_proto(self.replica_id(), buffer, buffer_file, cx).unwrap()
-                });
-                self.register_buffer(&buffer, buffer_worktree.as_ref(), cx)?;
-                Ok(buffer)
+                    let buffer = cx.add_model(|cx| {
+                        Buffer::from_proto(replica_id, buffer, buffer_file, cx).unwrap()
+                    });
+                    this.update(&mut cx, |this, cx| {
+                        this.register_buffer(&buffer, buffer_worktree.as_ref(), cx)
+                    })?;
+
+                    let _ = opened_buffer_tx.send(()).await;
+                    Ok(buffer)
+                }
             }
-        }
+        })
     }
 
     async fn handle_close_buffer(
@@ -2735,7 +2759,7 @@ impl WorktreeHandle {
 }
 
 impl OpenBuffer {
-    pub fn upgrade(&self, cx: &AppContext) -> Option<ModelHandle<Buffer>> {
+    pub fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option<ModelHandle<Buffer>> {
         match self {
             OpenBuffer::Loaded(handle) => handle.upgrade(cx),
             OpenBuffer::Operations(_) => None,

crates/server/src/rpc.rs 🔗

@@ -2660,6 +2660,7 @@ mod tests {
         // Set up a fake language server.
         let (language_server_config, mut fake_language_server) =
             LanguageServerConfig::fake(&cx_a).await;
+
         Arc::get_mut(&mut lang_registry)
             .unwrap()
             .add(Arc::new(Language::new(
@@ -2687,6 +2688,7 @@ mod tests {
                 cx,
             )
         });
+
         let (worktree_a, _) = project_a
             .update(&mut cx_a, |p, cx| {
                 p.find_or_create_local_worktree("/root", false, cx)