From 50a31721ebf6ddcd561830464121b4a948594cc3 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 17 Feb 2022 11:41:19 +0100 Subject: [PATCH] Wait for version before returning completions --- crates/language/src/buffer.rs | 4 ++++ crates/project/src/project.rs | 13 ++++++------- crates/server/src/rpc.rs | 15 +++++++++------ crates/text/src/text.rs | 16 +++++++++++++++- 4 files changed, 34 insertions(+), 14 deletions(-) diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index f19ff21081af8b2fb9773e8ea3ae6f6496b7c155..b4543b02b02d98cfb06ecf75bbc66391b0447d47 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -1283,6 +1283,10 @@ impl Buffer { self.text.wait_for_edits(edit_ids) } + pub fn wait_for_version(&mut self, version: clock::Global) -> impl Future { + self.text.wait_for_version(version) + } + pub fn set_active_selections( &mut self, selections: Arc<[Selection]>, diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index be7ba09d32e660ed22318c226915b73b53be5d10..1c1f3dcdb4555dd4283bab770d934b75c004da63 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1401,15 +1401,14 @@ impl Project { position: Some(language::proto::serialize_anchor(&anchor)), version: (&source_buffer.version()).into(), }; - cx.spawn_weak(|_, cx| async move { + cx.spawn_weak(|_, mut cx| async move { let response = rpc.request(message).await?; - if !source_buffer_handle - .read_with(&cx, |buffer, _| buffer.version()) - .observed_all(&response.version.into()) - { - Err(anyhow!("completion response depends on unreceived edits"))?; - } + source_buffer_handle + .update(&mut cx, |buffer, _| { + buffer.wait_for_version(response.version.into()) + }) + .await; response .completions diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 782ffa7a235b6def29ca205b6b014343d37a1c6d..9c6f15c97e186b88169f34be81b83411010abebd 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -4196,17 +4196,18 @@ mod tests { drop(buffer); }); } - 10..=14 => { + 10..=19 => { let completions = project.update(&mut cx, |project, cx| { log::info!( "Guest {}: requesting completions for buffer {:?}", guest_id, buffer.read(cx).file().unwrap().full_path(cx) ); - project.completions(&buffer, 0, cx) + let offset = rng.borrow_mut().gen_range(0..=buffer.read(cx).len()); + project.completions(&buffer, offset, cx) }); let completions = cx.background().spawn(async move { - completions.await.expect("code actions request failed"); + completions.await.expect("completions request failed"); }); if rng.borrow_mut().gen_bool(0.3) { log::info!("Guest {}: detaching completions request", guest_id); @@ -4215,14 +4216,16 @@ mod tests { completions.await; } } - 15..=19 => { + 20..=29 => { let code_actions = project.update(&mut cx, |project, cx| { log::info!( "Guest {}: requesting code actions for buffer {:?}", guest_id, buffer.read(cx).file().unwrap().full_path(cx) ); - project.code_actions(&buffer, 0..0, cx) + let range = + buffer.read(cx).random_byte_range(0, &mut *rng.borrow_mut()); + project.code_actions(&buffer, range, cx) }); let code_actions = cx.background().spawn(async move { code_actions.await.expect("code actions request failed"); @@ -4234,7 +4237,7 @@ mod tests { code_actions.await; } } - 20..=29 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => { + 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => { let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| { log::info!( "Guest {}: saving buffer {:?}", diff --git a/crates/text/src/text.rs b/crates/text/src/text.rs index 9b7f8dd230e0210b372c0b2ad9f0faf1aef1f004..da003b5d443616b76498a5eb5b51990c66830cf8 100644 --- a/crates/text/src/text.rs +++ b/crates/text/src/text.rs @@ -21,7 +21,7 @@ use operation_queue::OperationQueue; pub use patch::Patch; pub use point::*; pub use point_utf16::*; -use postage::{oneshot, prelude::*}; +use postage::{barrier, oneshot, prelude::*}; #[cfg(any(test, feature = "test-support"))] pub use random_char_iter::*; use rope::TextDimension; @@ -53,6 +53,7 @@ pub struct Buffer { pub lamport_clock: clock::Lamport, subscriptions: Topic, edit_id_resolvers: HashMap>>, + version_barriers: Vec<(clock::Global, barrier::Sender)>, } #[derive(Clone, Debug)] @@ -574,6 +575,7 @@ impl Buffer { lamport_clock, subscriptions: Default::default(), edit_id_resolvers: Default::default(), + version_barriers: Default::default(), } } @@ -835,6 +837,8 @@ impl Buffer { } } } + self.version_barriers + .retain(|(version, _)| !self.snapshot.version().observed_all(version)); Ok(()) } @@ -1305,6 +1309,16 @@ impl Buffer { } } + pub fn wait_for_version(&mut self, version: clock::Global) -> impl Future { + let (tx, mut rx) = barrier::channel(); + if !self.snapshot.version.observed_all(&version) { + self.version_barriers.push((version, tx)); + } + async move { + rx.recv().await; + } + } + fn resolve_edit(&mut self, edit_id: clock::Local) { for mut tx in self .edit_id_resolvers