From cf4291a126abf0bc1692146e47a3f4a762e468f4 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 16 Feb 2022 10:01:23 -0800 Subject: [PATCH] Distinguish between "foreground" and "background" RPC messages Some types of messages, which entail state updates on the host, should be processed in the order that they were sent. Other types of messages should not block the processing of other messages. Co-Authored-By: Antonio Scandurra --- crates/rpc/src/proto.rs | 122 +++++++++++++++++++++------------------ crates/server/src/rpc.rs | 12 +++- 2 files changed, 76 insertions(+), 58 deletions(-) diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 3f062df97c1327a324d1479a075fb6cfc1abe4be..6352c466c94e24f35946c262fd6bfa4e18bf23a5 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -13,6 +13,7 @@ include!(concat!(env!("OUT_DIR"), "/zed.messages.rs")); pub trait EnvelopedMessage: Clone + Sized + Send + Sync + 'static { const NAME: &'static str; + const PRIORITY: MessagePriority; fn into_envelope( self, id: u32, @@ -35,6 +36,12 @@ pub trait AnyTypedEnvelope: 'static + Send + Sync { fn payload_type_name(&self) -> &'static str; fn as_any(&self) -> &dyn Any; fn into_any(self: Box) -> Box; + fn is_background(&self) -> bool; +} + +pub enum MessagePriority { + Foreground, + Background, } impl AnyTypedEnvelope for TypedEnvelope { @@ -53,10 +60,14 @@ impl AnyTypedEnvelope for TypedEnvelope { fn into_any(self: Box) -> Box { self } + + fn is_background(&self) -> bool { + matches!(T::PRIORITY, MessagePriority::Background) + } } macro_rules! messages { - ($($name:ident),* $(,)?) => { + ($(($name:ident, $priority:ident)),* $(,)?) => { pub fn build_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option> { match envelope.payload { $(Some(envelope::Payload::$name(payload)) => { @@ -74,6 +85,7 @@ macro_rules! messages { $( impl EnvelopedMessage for $name { const NAME: &'static str = std::stringify!($name); + const PRIORITY: MessagePriority = MessagePriority::$priority; fn into_envelope( self, @@ -120,60 +132,60 @@ macro_rules! entity_messages { } messages!( - Ack, - AddProjectCollaborator, - ApplyCodeAction, - ApplyCodeActionResponse, - ApplyCompletionAdditionalEdits, - ApplyCompletionAdditionalEditsResponse, - BufferReloaded, - BufferSaved, - ChannelMessageSent, - CloseBuffer, - DiskBasedDiagnosticsUpdated, - DiskBasedDiagnosticsUpdating, - Error, - FormatBuffers, - FormatBuffersResponse, - GetChannelMessages, - GetChannelMessagesResponse, - GetChannels, - GetChannelsResponse, - GetCodeActions, - GetCodeActionsResponse, - GetCompletions, - GetCompletionsResponse, - GetDefinition, - GetDefinitionResponse, - GetUsers, - GetUsersResponse, - JoinChannel, - JoinChannelResponse, - JoinProject, - JoinProjectResponse, - LeaveChannel, - LeaveProject, - OpenBuffer, - OpenBufferResponse, - RegisterProjectResponse, - Ping, - RegisterProject, - RegisterWorktree, - RemoveProjectCollaborator, - SaveBuffer, - SendChannelMessage, - SendChannelMessageResponse, - ShareProject, - ShareWorktree, - Test, - UnregisterProject, - UnregisterWorktree, - UnshareProject, - UpdateBuffer, - UpdateBufferFile, - UpdateContacts, - UpdateDiagnosticSummary, - UpdateWorktree, + (Ack, Foreground), + (AddProjectCollaborator, Foreground), + (ApplyCodeAction, Foreground), + (ApplyCodeActionResponse, Foreground), + (ApplyCompletionAdditionalEdits, Foreground), + (ApplyCompletionAdditionalEditsResponse, Foreground), + (BufferReloaded, Foreground), + (BufferSaved, Foreground), + (ChannelMessageSent, Foreground), + (CloseBuffer, Foreground), + (DiskBasedDiagnosticsUpdated, Background), + (DiskBasedDiagnosticsUpdating, Background), + (Error, Foreground), + (FormatBuffers, Foreground), + (FormatBuffersResponse, Foreground), + (GetChannelMessages, Foreground), + (GetChannelMessagesResponse, Foreground), + (GetChannels, Foreground), + (GetChannelsResponse, Foreground), + (GetCodeActions, Background), + (GetCodeActionsResponse, Foreground), + (GetCompletions, Background), + (GetCompletionsResponse, Foreground), + (GetDefinition, Foreground), + (GetDefinitionResponse, Foreground), + (GetUsers, Foreground), + (GetUsersResponse, Foreground), + (JoinChannel, Foreground), + (JoinChannelResponse, Foreground), + (JoinProject, Foreground), + (JoinProjectResponse, Foreground), + (LeaveChannel, Foreground), + (LeaveProject, Foreground), + (OpenBuffer, Foreground), + (OpenBufferResponse, Foreground), + (RegisterProjectResponse, Foreground), + (Ping, Foreground), + (RegisterProject, Foreground), + (RegisterWorktree, Foreground), + (RemoveProjectCollaborator, Foreground), + (SaveBuffer, Foreground), + (SendChannelMessage, Foreground), + (SendChannelMessageResponse, Foreground), + (ShareProject, Foreground), + (ShareWorktree, Foreground), + (Test, Foreground), + (UnregisterProject, Foreground), + (UnregisterWorktree, Foreground), + (UnshareProject, Foreground), + (UpdateBuffer, Foreground), + (UpdateBufferFile, Foreground), + (UpdateContacts, Foreground), + (UpdateDiagnosticSummary, Foreground), + (UpdateWorktree, Foreground), ); request_messages!( diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 2c920560b39342463e5c7f1eee9357978e288f68..df3af546a15225bb7af887fcb7aca3158b92917d 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -190,9 +190,10 @@ impl Server { let type_name = message.payload_type_name(); log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name); if let Some(handler) = this.handlers.get(&message.payload_type_id()) { - let handle_message = (handler)(this.clone(), message); let notifications = this.notifications.clone(); - executor.spawn_detached(async move { + let is_background = message.is_background(); + let handle_message = (handler)(this.clone(), message); + let handle_message = async move { if let Err(err) = handle_message.await { log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err); } else { @@ -201,7 +202,12 @@ impl Server { if let Some(mut notifications) = notifications { let _ = notifications.send(()).await; } - }); + }; + if is_background { + executor.spawn_detached(handle_message); + } else { + handle_message.await; + } } else { log::warn!("unhandled message: {}", type_name); }