From 28ba49b47b118553a9f2f59112569c701d4c8562 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Mon, 14 Feb 2022 15:55:37 -0700 Subject: [PATCH] Wait for buffer if it doesn't exist when deserializing a reference Co-Authored-By: Max Brunsfeld --- crates/project/src/project.rs | 150 ++++++++++++++++++++-------------- crates/server/src/rpc.rs | 2 + 2 files changed, 89 insertions(+), 63 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 40df7cfb9ea00958344120f55dbdf9ecb57c15bc..eff003479f8c451e32f0e82c5ccc58b5aec8709a 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -10,7 +10,7 @@ use futures::Future; use fuzzy::{PathMatch, PathMatchCandidate, PathMatchCandidateSet}; use gpui::{ AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, - WeakModelHandle, + UpgradeModelHandle, WeakModelHandle, }; use language::{ point_from_lsp, @@ -20,7 +20,7 @@ use language::{ ToLspPosition, ToOffset, ToPointUtf16, Transaction, }; use lsp::{DiagnosticSeverity, LanguageServer}; -use postage::{prelude::Stream, watch}; +use postage::{broadcast, prelude::Stream, sink::Sink, watch}; use smol::block_on; use std::{ convert::TryInto, @@ -47,6 +47,7 @@ pub struct Project { subscriptions: Vec, language_servers_with_diagnostics_running: isize, open_buffers: HashMap, + opened_buffer: broadcast::Sender<()>, loading_buffers: HashMap< ProjectPath, postage::watch::Receiver, Arc>>>, @@ -221,6 +222,7 @@ impl Project { remote_id_rx, _maintain_remote_id_task, }, + opened_buffer: broadcast::channel(1).0, subscriptions: Vec::new(), active_entry: None, languages, @@ -278,6 +280,7 @@ impl Project { worktrees: Vec::new(), open_buffers: Default::default(), loading_buffers: Default::default(), + opened_buffer: broadcast::channel(1).0, shared_buffers: Default::default(), active_entry: None, collaborators, @@ -631,6 +634,7 @@ impl Project { .await?; let buffer = response.buffer.ok_or_else(|| anyhow!("missing buffer"))?; this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .await }) } @@ -1271,29 +1275,27 @@ impl Project { }; cx.spawn(|this, mut cx| async move { let response = client.request(request).await?; - this.update(&mut cx, |this, cx| { - let mut definitions = Vec::new(); - for definition in response.definitions { - let target_buffer = this.deserialize_buffer( - definition.buffer.ok_or_else(|| anyhow!("missing buffer"))?, - cx, - )?; - let target_start = definition - .target_start - .and_then(deserialize_anchor) - .ok_or_else(|| anyhow!("missing target start"))?; - let target_end = definition - .target_end - .and_then(deserialize_anchor) - .ok_or_else(|| anyhow!("missing target end"))?; - definitions.push(Definition { - target_buffer, - target_range: target_start..target_end, - }) - } + let mut definitions = Vec::new(); + for definition in response.definitions { + let buffer = definition.buffer.ok_or_else(|| anyhow!("missing buffer"))?; + let target_buffer = this + .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .await?; + let target_start = definition + .target_start + .and_then(deserialize_anchor) + .ok_or_else(|| anyhow!("missing target start"))?; + let target_end = definition + .target_end + .and_then(deserialize_anchor) + .ok_or_else(|| anyhow!("missing target end"))?; + definitions.push(Definition { + target_buffer, + target_range: target_start..target_end, + }) + } - Ok(definitions) - }) + Ok(definitions) }) } else { Task::ready(Ok(Default::default())) @@ -2531,20 +2533,15 @@ impl Project { push_to_history: bool, cx: &mut ModelContext, ) -> Task> { - let mut project_transaction = ProjectTransaction::default(); - for (buffer, transaction) in message.buffers.into_iter().zip(message.transactions) { - let buffer = match self.deserialize_buffer(buffer, cx) { - Ok(buffer) => buffer, - Err(error) => return Task::ready(Err(error)), - }; - let transaction = match language::proto::deserialize_transaction(transaction) { - Ok(transaction) => transaction, - Err(error) => return Task::ready(Err(error)), - }; - project_transaction.0.insert(buffer, transaction); - } - - cx.spawn_weak(|_, mut cx| async move { + cx.spawn(|this, mut cx| async move { + let mut project_transaction = ProjectTransaction::default(); + for (buffer, transaction) in message.buffers.into_iter().zip(message.transactions) { + let buffer = this + .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .await?; + let transaction = language::proto::deserialize_transaction(transaction)?; + project_transaction.0.insert(buffer, transaction); + } for (buffer, transaction) in &project_transaction.0 { buffer .update(&mut cx, |buffer, _| { @@ -2588,33 +2585,60 @@ impl Project { &mut self, buffer: proto::Buffer, cx: &mut ModelContext, - ) -> Result> { - match buffer.variant.ok_or_else(|| anyhow!("missing buffer"))? { - proto::buffer::Variant::Id(id) => self - .open_buffers - .get(&(id as usize)) - .and_then(|buffer| buffer.upgrade(cx)) - .ok_or_else(|| anyhow!("no buffer exists for id {}", id)), - proto::buffer::Variant::State(mut buffer) => { - let mut buffer_worktree = None; - let mut buffer_file = None; - if let Some(file) = buffer.file.take() { - let worktree_id = WorktreeId::from_proto(file.worktree_id); - let worktree = self - .worktree_for_id(worktree_id, cx) - .ok_or_else(|| anyhow!("no worktree found for id {}", file.worktree_id))?; - buffer_file = Some(Box::new(File::from_proto(file, worktree.clone(), cx)?) - as Box); - buffer_worktree = Some(worktree); + ) -> Task>> { + let replica_id = self.replica_id(); + + let mut opened_buffer_tx = self.opened_buffer.clone(); + let mut opened_buffer_rx = self.opened_buffer.subscribe(); + cx.spawn(|this, mut cx| async move { + match buffer.variant.ok_or_else(|| anyhow!("missing buffer"))? { + proto::buffer::Variant::Id(id) => { + let buffer = loop { + let buffer = this.read_with(&cx, |this, cx| { + this.open_buffers + .get(&(id as usize)) + .and_then(|buffer| buffer.upgrade(cx)) + }); + if let Some(buffer) = buffer { + break buffer; + } + opened_buffer_rx + .recv() + .await + .ok_or_else(|| anyhow!("project dropped while waiting for buffer"))?; + }; + Ok(buffer) } + proto::buffer::Variant::State(mut buffer) => { + let mut buffer_worktree = None; + let mut buffer_file = None; + if let Some(file) = buffer.file.take() { + this.read_with(&cx, |this, cx| { + let worktree_id = WorktreeId::from_proto(file.worktree_id); + let worktree = + this.worktree_for_id(worktree_id, cx).ok_or_else(|| { + anyhow!("no worktree found for id {}", file.worktree_id) + })?; + buffer_file = + Some(Box::new(File::from_proto(file, worktree.clone(), cx)?) + as Box); + buffer_worktree = Some(worktree); + Ok::<_, anyhow::Error>(()) + })?; + } - let buffer = cx.add_model(|cx| { - Buffer::from_proto(self.replica_id(), buffer, buffer_file, cx).unwrap() - }); - self.register_buffer(&buffer, buffer_worktree.as_ref(), cx)?; - Ok(buffer) + let buffer = cx.add_model(|cx| { + Buffer::from_proto(replica_id, buffer, buffer_file, cx).unwrap() + }); + this.update(&mut cx, |this, cx| { + this.register_buffer(&buffer, buffer_worktree.as_ref(), cx) + })?; + + let _ = opened_buffer_tx.send(()).await; + Ok(buffer) + } } - } + }) } async fn handle_close_buffer( @@ -2735,7 +2759,7 @@ impl WorktreeHandle { } impl OpenBuffer { - pub fn upgrade(&self, cx: &AppContext) -> Option> { + pub fn upgrade(&self, cx: &impl UpgradeModelHandle) -> Option> { match self { OpenBuffer::Loaded(handle) => handle.upgrade(cx), OpenBuffer::Operations(_) => None, diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 76123f75bcc75742e48bfdf3728dfe79e6d2f4c2..cca6d6a4fa5ae115fa8268ac900ca8e01f51c801 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -2660,6 +2660,7 @@ mod tests { // Set up a fake language server. let (language_server_config, mut fake_language_server) = LanguageServerConfig::fake(&cx_a).await; + Arc::get_mut(&mut lang_registry) .unwrap() .add(Arc::new(Language::new( @@ -2687,6 +2688,7 @@ mod tests { cx, ) }); + let (worktree_a, _) = project_a .update(&mut cx_a, |p, cx| { p.find_or_create_local_worktree("/root", false, cx)