Distinguish between "foreground" and "background" RPC messages

Max Brunsfeld and Antonio Scandurra created

Some types of messages, which entail state updates on the host, should be
processed in the order that they were sent. Other types of messages should
not block the processing of other messages.

Co-Authored-By: Antonio Scandurra <me@as-cii.com>

Change summary

crates/rpc/src/proto.rs  | 122 +++++++++++++++++++++++------------------
crates/server/src/rpc.rs |  12 +++-
2 files changed, 76 insertions(+), 58 deletions(-)

Detailed changes

crates/rpc/src/proto.rs 🔗

@@ -13,6 +13,7 @@ include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 
 pub trait EnvelopedMessage: Clone + Sized + Send + Sync + 'static {
     const NAME: &'static str;
+    const PRIORITY: MessagePriority;
     fn into_envelope(
         self,
         id: u32,
@@ -35,6 +36,12 @@ pub trait AnyTypedEnvelope: 'static + Send + Sync {
     fn payload_type_name(&self) -> &'static str;
     fn as_any(&self) -> &dyn Any;
     fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
+    fn is_background(&self) -> bool;
+}
+
+pub enum MessagePriority {
+    Foreground,
+    Background,
 }
 
 impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
@@ -53,10 +60,14 @@ impl<T: EnvelopedMessage> AnyTypedEnvelope for TypedEnvelope<T> {
     fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
         self
     }
+
+    fn is_background(&self) -> bool {
+        matches!(T::PRIORITY, MessagePriority::Background)
+    }
 }
 
 macro_rules! messages {
-    ($($name:ident),* $(,)?) => {
+    ($(($name:ident, $priority:ident)),* $(,)?) => {
         pub fn build_typed_envelope(sender_id: ConnectionId, envelope: Envelope) -> Option<Box<dyn AnyTypedEnvelope>> {
             match envelope.payload {
                 $(Some(envelope::Payload::$name(payload)) => {
@@ -74,6 +85,7 @@ macro_rules! messages {
         $(
             impl EnvelopedMessage for $name {
                 const NAME: &'static str = std::stringify!($name);
+                const PRIORITY: MessagePriority = MessagePriority::$priority;
 
                 fn into_envelope(
                     self,
@@ -120,60 +132,60 @@ macro_rules! entity_messages {
 }
 
 messages!(
-    Ack,
-    AddProjectCollaborator,
-    ApplyCodeAction,
-    ApplyCodeActionResponse,
-    ApplyCompletionAdditionalEdits,
-    ApplyCompletionAdditionalEditsResponse,
-    BufferReloaded,
-    BufferSaved,
-    ChannelMessageSent,
-    CloseBuffer,
-    DiskBasedDiagnosticsUpdated,
-    DiskBasedDiagnosticsUpdating,
-    Error,
-    FormatBuffers,
-    FormatBuffersResponse,
-    GetChannelMessages,
-    GetChannelMessagesResponse,
-    GetChannels,
-    GetChannelsResponse,
-    GetCodeActions,
-    GetCodeActionsResponse,
-    GetCompletions,
-    GetCompletionsResponse,
-    GetDefinition,
-    GetDefinitionResponse,
-    GetUsers,
-    GetUsersResponse,
-    JoinChannel,
-    JoinChannelResponse,
-    JoinProject,
-    JoinProjectResponse,
-    LeaveChannel,
-    LeaveProject,
-    OpenBuffer,
-    OpenBufferResponse,
-    RegisterProjectResponse,
-    Ping,
-    RegisterProject,
-    RegisterWorktree,
-    RemoveProjectCollaborator,
-    SaveBuffer,
-    SendChannelMessage,
-    SendChannelMessageResponse,
-    ShareProject,
-    ShareWorktree,
-    Test,
-    UnregisterProject,
-    UnregisterWorktree,
-    UnshareProject,
-    UpdateBuffer,
-    UpdateBufferFile,
-    UpdateContacts,
-    UpdateDiagnosticSummary,
-    UpdateWorktree,
+    (Ack, Foreground),
+    (AddProjectCollaborator, Foreground),
+    (ApplyCodeAction, Foreground),
+    (ApplyCodeActionResponse, Foreground),
+    (ApplyCompletionAdditionalEdits, Foreground),
+    (ApplyCompletionAdditionalEditsResponse, Foreground),
+    (BufferReloaded, Foreground),
+    (BufferSaved, Foreground),
+    (ChannelMessageSent, Foreground),
+    (CloseBuffer, Foreground),
+    (DiskBasedDiagnosticsUpdated, Background),
+    (DiskBasedDiagnosticsUpdating, Background),
+    (Error, Foreground),
+    (FormatBuffers, Foreground),
+    (FormatBuffersResponse, Foreground),
+    (GetChannelMessages, Foreground),
+    (GetChannelMessagesResponse, Foreground),
+    (GetChannels, Foreground),
+    (GetChannelsResponse, Foreground),
+    (GetCodeActions, Background),
+    (GetCodeActionsResponse, Foreground),
+    (GetCompletions, Background),
+    (GetCompletionsResponse, Foreground),
+    (GetDefinition, Foreground),
+    (GetDefinitionResponse, Foreground),
+    (GetUsers, Foreground),
+    (GetUsersResponse, Foreground),
+    (JoinChannel, Foreground),
+    (JoinChannelResponse, Foreground),
+    (JoinProject, Foreground),
+    (JoinProjectResponse, Foreground),
+    (LeaveChannel, Foreground),
+    (LeaveProject, Foreground),
+    (OpenBuffer, Foreground),
+    (OpenBufferResponse, Foreground),
+    (RegisterProjectResponse, Foreground),
+    (Ping, Foreground),
+    (RegisterProject, Foreground),
+    (RegisterWorktree, Foreground),
+    (RemoveProjectCollaborator, Foreground),
+    (SaveBuffer, Foreground),
+    (SendChannelMessage, Foreground),
+    (SendChannelMessageResponse, Foreground),
+    (ShareProject, Foreground),
+    (ShareWorktree, Foreground),
+    (Test, Foreground),
+    (UnregisterProject, Foreground),
+    (UnregisterWorktree, Foreground),
+    (UnshareProject, Foreground),
+    (UpdateBuffer, Foreground),
+    (UpdateBufferFile, Foreground),
+    (UpdateContacts, Foreground),
+    (UpdateDiagnosticSummary, Foreground),
+    (UpdateWorktree, Foreground),
 );
 
 request_messages!(

crates/server/src/rpc.rs 🔗

@@ -190,9 +190,10 @@ impl Server {
                             let type_name = message.payload_type_name();
                             log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
                             if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
-                                let handle_message = (handler)(this.clone(), message);
                                 let notifications = this.notifications.clone();
-                                executor.spawn_detached(async move {
+                                let is_background = message.is_background();
+                                let handle_message = (handler)(this.clone(), message);
+                                let handle_message = async move {
                                     if let Err(err) = handle_message.await {
                                         log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
                                     } else {
@@ -201,7 +202,12 @@ impl Server {
                                     if let Some(mut notifications) = notifications {
                                         let _ = notifications.send(()).await;
                                     }
-                                });
+                                };
+                                if is_background {
+                                    executor.spawn_detached(handle_message);
+                                } else {
+                                    handle_message.await;
+                                }
                             } else {
                                 log::warn!("unhandled message: {}", type_name);
                             }