diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index ddc6fa7c93d6d73d915c1a36a56c925a265a04f0..b53e02d733e59066a4387b8527512d22dff99510 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -1291,6 +1291,13 @@ impl Buffer { self.text.wait_for_edits(edit_ids) } + pub fn wait_for_anchors<'a>( + &mut self, + anchors: impl IntoIterator, + ) -> impl Future { + self.text.wait_for_anchors(anchors) + } + pub fn wait_for_version(&mut self, version: clock::Global) -> impl Future { self.text.wait_for_version(version) } diff --git a/crates/project/src/lsp_command.rs b/crates/project/src/lsp_command.rs index 3b502fc8fafc5accfc977eee572c853a68701b48..408c8a150ce93b2f182da2f2e664a0ebdb94da8e 100644 --- a/crates/project/src/lsp_command.rs +++ b/crates/project/src/lsp_command.rs @@ -31,10 +31,11 @@ pub(crate) trait LspCommand: 'static + Sized { ) -> Result; fn to_proto(&self, project_id: u64, buffer: &Buffer) -> Self::ProtoRequest; - fn from_proto( + async fn from_proto( message: Self::ProtoRequest, - project: &mut Project, - buffer: &Buffer, + project: ModelHandle, + buffer: ModelHandle, + cx: AsyncAppContext, ) -> Result; fn response_to_proto( response: Self::Response, @@ -121,19 +122,28 @@ impl LspCommand for PrepareRename { position: Some(language::proto::serialize_anchor( &buffer.anchor_before(self.position), )), + version: (&buffer.version()).into(), } } - fn from_proto(message: proto::PrepareRename, _: &mut Project, buffer: &Buffer) -> Result { + async fn from_proto( + message: proto::PrepareRename, + _: ModelHandle, + buffer: ModelHandle, + mut cx: AsyncAppContext, + ) -> Result { let position = message .position .and_then(deserialize_anchor) .ok_or_else(|| anyhow!("invalid position"))?; - if !buffer.can_resolve(&position) { - Err(anyhow!("cannot resolve position"))?; - } + buffer + .update(&mut cx, |buffer, _| { + buffer.wait_for_version(message.version.into()) + }) + .await; + Ok(Self { - position: position.to_point_utf16(buffer), + position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)), }) } @@ -241,19 +251,27 @@ impl LspCommand for PerformRename { &buffer.anchor_before(self.position), )), new_name: self.new_name.clone(), + version: (&buffer.version()).into(), } } - fn from_proto(message: proto::PerformRename, _: &mut Project, buffer: &Buffer) -> Result { + async fn from_proto( + message: proto::PerformRename, + _: ModelHandle, + buffer: ModelHandle, + mut cx: AsyncAppContext, + ) -> Result { let position = message .position .and_then(deserialize_anchor) .ok_or_else(|| anyhow!("invalid position"))?; - if !buffer.can_resolve(&position) { - Err(anyhow!("cannot resolve position"))?; - } + buffer + .update(&mut cx, |buffer, _| { + buffer.wait_for_version(message.version.into()) + }) + .await; Ok(Self { - position: position.to_point_utf16(buffer), + position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)), new_name: message.new_name, push_to_history: false, }) @@ -385,19 +403,27 @@ impl LspCommand for GetDefinition { position: Some(language::proto::serialize_anchor( &buffer.anchor_before(self.position), )), + version: (&buffer.version()).into(), } } - fn from_proto(message: proto::GetDefinition, _: &mut Project, buffer: &Buffer) -> Result { + async fn from_proto( + message: proto::GetDefinition, + _: ModelHandle, + buffer: ModelHandle, + mut cx: AsyncAppContext, + ) -> Result { let position = message .position .and_then(deserialize_anchor) .ok_or_else(|| anyhow!("invalid position"))?; - if !buffer.can_resolve(&position) { - Err(anyhow!("cannot resolve position"))?; - } + buffer + .update(&mut cx, |buffer, _| { + buffer.wait_for_version(message.version.into()) + }) + .await; Ok(Self { - position: position.to_point_utf16(buffer), + position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)), }) } @@ -443,6 +469,9 @@ impl LspCommand for GetDefinition { .end .and_then(deserialize_anchor) .ok_or_else(|| anyhow!("missing target end"))?; + buffer + .update(&mut cx, |buffer, _| buffer.wait_for_anchors([&start, &end])) + .await; locations.push(Location { buffer, range: start..end, @@ -533,19 +562,27 @@ impl LspCommand for GetReferences { position: Some(language::proto::serialize_anchor( &buffer.anchor_before(self.position), )), + version: (&buffer.version()).into(), } } - fn from_proto(message: proto::GetReferences, _: &mut Project, buffer: &Buffer) -> Result { + async fn from_proto( + message: proto::GetReferences, + _: ModelHandle, + buffer: ModelHandle, + mut cx: AsyncAppContext, + ) -> Result { let position = message .position .and_then(deserialize_anchor) .ok_or_else(|| anyhow!("invalid position"))?; - if !buffer.can_resolve(&position) { - Err(anyhow!("cannot resolve position"))?; - } + buffer + .update(&mut cx, |buffer, _| { + buffer.wait_for_version(message.version.into()) + }) + .await; Ok(Self { - position: position.to_point_utf16(buffer), + position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)), }) } @@ -591,6 +628,9 @@ impl LspCommand for GetReferences { .end .and_then(deserialize_anchor) .ok_or_else(|| anyhow!("missing target end"))?; + target_buffer + .update(&mut cx, |buffer, _| buffer.wait_for_anchors([&start, &end])) + .await; locations.push(Location { buffer: target_buffer, range: start..end, @@ -658,23 +698,27 @@ impl LspCommand for GetDocumentHighlights { position: Some(language::proto::serialize_anchor( &buffer.anchor_before(self.position), )), + version: (&buffer.version()).into(), } } - fn from_proto( + async fn from_proto( message: proto::GetDocumentHighlights, - _: &mut Project, - buffer: &Buffer, + _: ModelHandle, + buffer: ModelHandle, + mut cx: AsyncAppContext, ) -> Result { let position = message .position .and_then(deserialize_anchor) .ok_or_else(|| anyhow!("invalid position"))?; - if !buffer.can_resolve(&position) { - Err(anyhow!("cannot resolve position"))?; - } + buffer + .update(&mut cx, |buffer, _| { + buffer.wait_for_version(message.version.into()) + }) + .await; Ok(Self { - position: position.to_point_utf16(buffer), + position: buffer.read_with(&cx, |buffer, _| position.to_point_utf16(buffer)), }) } @@ -705,33 +749,34 @@ impl LspCommand for GetDocumentHighlights { self, message: proto::GetDocumentHighlightsResponse, _: ModelHandle, - _: ModelHandle, - _: AsyncAppContext, + buffer: ModelHandle, + mut cx: AsyncAppContext, ) -> Result> { - Ok(message - .highlights - .into_iter() - .map(|highlight| { - let start = highlight - .start - .and_then(deserialize_anchor) - .ok_or_else(|| anyhow!("missing target start"))?; - let end = highlight - .end - .and_then(deserialize_anchor) - .ok_or_else(|| anyhow!("missing target end"))?; - let kind = match proto::document_highlight::Kind::from_i32(highlight.kind) { - Some(proto::document_highlight::Kind::Text) => DocumentHighlightKind::TEXT, - Some(proto::document_highlight::Kind::Read) => DocumentHighlightKind::READ, - Some(proto::document_highlight::Kind::Write) => DocumentHighlightKind::WRITE, - None => DocumentHighlightKind::TEXT, - }; - Ok(DocumentHighlight { - range: start..end, - kind, - }) - }) - .collect::>>()?) + let mut highlights = Vec::new(); + for highlight in message.highlights { + let start = highlight + .start + .and_then(deserialize_anchor) + .ok_or_else(|| anyhow!("missing target start"))?; + let end = highlight + .end + .and_then(deserialize_anchor) + .ok_or_else(|| anyhow!("missing target end"))?; + buffer + .update(&mut cx, |buffer, _| buffer.wait_for_anchors([&start, &end])) + .await; + let kind = match proto::document_highlight::Kind::from_i32(highlight.kind) { + Some(proto::document_highlight::Kind::Text) => DocumentHighlightKind::TEXT, + Some(proto::document_highlight::Kind::Read) => DocumentHighlightKind::READ, + Some(proto::document_highlight::Kind::Write) => DocumentHighlightKind::WRITE, + None => DocumentHighlightKind::TEXT, + }; + highlights.push(DocumentHighlight { + range: start..end, + kind, + }); + } + Ok(highlights) } fn buffer_id_from_proto(message: &proto::GetDocumentHighlights) -> u64 { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index e4b8ed95d3ba301104af07869ff55f157535d4f5..616c555151cfe97878086b9db3556178884c98b3 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1815,6 +1815,7 @@ impl Project { }) } else if let Some(project_id) = self.remote_id() { let rpc = self.client.clone(); + let version = buffer.version(); cx.spawn_weak(|_, mut cx| async move { let response = rpc .request(proto::GetCodeActions { @@ -1822,6 +1823,7 @@ impl Project { buffer_id, start: Some(language::proto::serialize_anchor(&range.start)), end: Some(language::proto::serialize_anchor(&range.end)), + version: (&version).into(), }) .await?; @@ -2840,13 +2842,11 @@ impl Project { .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?; Ok::<_, anyhow::Error>((project_id, buffer)) })?; - - if !buffer - .read_with(&cx, |buffer, _| buffer.version()) - .observed_all(&requested_version) - { - Err(anyhow!("save request depends on unreceived edits"))?; - } + buffer + .update(&mut cx, |buffer, _| { + buffer.wait_for_version(requested_version) + }) + .await; let (saved_version, mtime) = buffer.update(&mut cx, |buffer, cx| buffer.save(cx)).await?; Ok(proto::BufferSaved { @@ -2904,12 +2904,9 @@ impl Project { .map(|buffer| buffer.upgrade(cx).unwrap()) .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id)) })?; - if !buffer - .read_with(&cx, |buffer, _| buffer.version()) - .observed_all(&version) - { - Err(anyhow!("completion request depends on unreceived edits"))?; - } + buffer + .update(&mut cx, |buffer, _| buffer.wait_for_version(version)) + .await; let version = buffer.read_with(&cx, |buffer, _| buffer.version()); let completions = this .update(&mut cx, |this, cx| this.completions(&buffer, position, cx)) @@ -2979,10 +2976,13 @@ impl Project { .map(|buffer| buffer.upgrade(cx).unwrap()) .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id)) })?; + buffer + .update(&mut cx, |buffer, _| { + buffer.wait_for_version(envelope.payload.version.into()) + }) + .await; + let version = buffer.read_with(&cx, |buffer, _| buffer.version()); - if !version.observed(start.timestamp) || !version.observed(end.timestamp) { - Err(anyhow!("code action request references unreceived edits"))?; - } let code_actions = this.update(&mut cx, |this, cx| { Ok::<_, anyhow::Error>(this.code_actions(&buffer, start..end, cx)) })?; @@ -3038,19 +3038,26 @@ impl Project { ::Result: Send, { let sender_id = envelope.original_sender_id()?; - let (request, buffer_version) = this.update(&mut cx, |this, cx| { - let buffer_id = T::buffer_id_from_proto(&envelope.payload); - let buffer_handle = this - .opened_buffers + let buffer_id = T::buffer_id_from_proto(&envelope.payload); + let buffer_handle = this.read_with(&cx, |this, _| { + this.opened_buffers .get(&buffer_id) - .map(|buffer| buffer.upgrade(cx).unwrap()) - .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?; - let buffer = buffer_handle.read(cx); - let buffer_version = buffer.version(); - let request = T::from_proto(envelope.payload, this, buffer)?; - Ok::<_, anyhow::Error>((this.request_lsp(buffer_handle, request, cx), buffer_version)) + .map(|buffer| buffer.upgrade(&cx).unwrap()) + .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id)) })?; - let response = request.await?; + let request = T::from_proto( + envelope.payload, + this.clone(), + buffer_handle.clone(), + cx.clone(), + ) + .await?; + let buffer_version = buffer_handle.read_with(&cx, |buffer, _| buffer.version()); + let response = this + .update(&mut cx, |this, cx| { + this.request_lsp(buffer_handle, request, cx) + }) + .await?; this.update(&mut cx, |this, cx| { Ok(T::response_to_proto( response, diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 4d55208872212b93af91f6afb556b26adf72b56a..801334d3fb6a907ee67dd5664fd5a6d6d86e1238 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -164,6 +164,7 @@ message GetDefinition { uint64 project_id = 1; uint64 buffer_id = 2; Anchor position = 3; + repeated VectorClockEntry version = 4; } message GetDefinitionResponse { @@ -174,6 +175,7 @@ message GetReferences { uint64 project_id = 1; uint64 buffer_id = 2; Anchor position = 3; + repeated VectorClockEntry version = 4; } message GetReferencesResponse { @@ -184,6 +186,7 @@ message GetDocumentHighlights { uint64 project_id = 1; uint64 buffer_id = 2; Anchor position = 3; + repeated VectorClockEntry version = 4; } message GetDocumentHighlightsResponse { @@ -328,6 +331,7 @@ message GetCodeActions { uint64 buffer_id = 2; Anchor start = 3; Anchor end = 4; + repeated VectorClockEntry version = 5; } message GetCodeActionsResponse { @@ -349,6 +353,7 @@ message PrepareRename { uint64 project_id = 1; uint64 buffer_id = 2; Anchor position = 3; + repeated VectorClockEntry version = 4; } message PrepareRenameResponse { @@ -363,6 +368,7 @@ message PerformRename { uint64 buffer_id = 2; Anchor position = 3; string new_name = 4; + repeated VectorClockEntry version = 5; } message PerformRenameResponse { diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index c5da067f17dd1403631371237cdfd4afb9c6e132..f60c2d2c0fb2be91cb0356d3b938c317e241d2a1 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -157,15 +157,15 @@ messages!( (GetChannels, Foreground), (GetChannelsResponse, Foreground), (GetCodeActions, Background), - (GetCodeActionsResponse, Foreground), + (GetCodeActionsResponse, Background), (GetCompletions, Background), - (GetCompletionsResponse, Foreground), - (GetDefinition, Foreground), - (GetDefinitionResponse, Foreground), + (GetCompletionsResponse, Background), + (GetDefinition, Background), + (GetDefinitionResponse, Background), (GetDocumentHighlights, Background), (GetDocumentHighlightsResponse, Background), - (GetReferences, Foreground), - (GetReferencesResponse, Foreground), + (GetReferences, Background), + (GetReferencesResponse, Background), (GetProjectSymbols, Background), (GetProjectSymbolsResponse, Background), (GetUsers, Foreground), @@ -176,10 +176,10 @@ messages!( (JoinProjectResponse, Foreground), (LeaveChannel, Foreground), (LeaveProject, Foreground), - (OpenBuffer, Foreground), - (OpenBufferForSymbol, Foreground), - (OpenBufferForSymbolResponse, Foreground), - (OpenBufferResponse, Foreground), + (OpenBuffer, Background), + (OpenBufferForSymbol, Background), + (OpenBufferForSymbolResponse, Background), + (OpenBufferResponse, Background), (PerformRename, Background), (PerformRenameResponse, Background), (PrepareRename, Background), @@ -199,7 +199,7 @@ messages!( (UnregisterProject, Foreground), (UnregisterWorktree, Foreground), (UnshareProject, Foreground), - (UpdateBuffer, Foreground), + (UpdateBuffer, Background), (UpdateBufferFile, Foreground), (UpdateContacts, Foreground), (UpdateDiagnosticSummary, Foreground), diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 2b00bfc7f6daf17ae9077862c0b9ecd36f5a73c0..9a8b4ee161686b1567aa2e4aef3dd957d12d3043 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -4911,12 +4911,9 @@ mod tests { ); (buffer.version(), buffer.save(cx)) }); - let save = cx.spawn(|cx| async move { + let save = cx.background().spawn(async move { let (saved_version, _) = save.await.expect("save request failed"); - buffer.read_with(&cx, |buffer, _| { - assert!(buffer.version().observed_all(&saved_version)); - assert!(saved_version.observed_all(&requested_version)); - }); + assert!(saved_version.observed_all(&requested_version)); }); if rng.lock().gen_bool(0.3) { log::info!("Guest {}: detaching save request", guest_id); diff --git a/crates/text/src/text.rs b/crates/text/src/text.rs index d92a36dd4313e0e6aed333f5634e07d99d619164..e9e106ea83ffe192273c5216a99a65cad43fc478 100644 --- a/crates/text/src/text.rs +++ b/crates/text/src/text.rs @@ -1307,6 +1307,32 @@ impl Buffer { } } + pub fn wait_for_anchors<'a>( + &mut self, + anchors: impl IntoIterator, + ) -> impl 'static + Future { + let mut futures = Vec::new(); + for anchor in anchors { + if !self.version.observed(anchor.timestamp) + && *anchor != Anchor::max() + && *anchor != Anchor::min() + { + let (tx, rx) = oneshot::channel(); + self.edit_id_resolvers + .entry(anchor.timestamp) + .or_default() + .push(tx); + futures.push(rx); + } + } + + async move { + for mut future in futures { + future.recv().await; + } + } + } + pub fn wait_for_version(&mut self, version: clock::Global) -> impl Future { let (tx, mut rx) = barrier::channel(); if !self.snapshot.version.observed_all(&version) {