From 5e4872fdf845f036585bbb7e168fdf57af9ba664 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 16 Jun 2021 17:43:53 +0200 Subject: [PATCH] Replace Request and Message with a single TypedEnvelope Co-Authored-By: Nathan Sobo --- zed/src/rpc_client.rs | 124 ++++++++++++------------------------------ zed/src/util.rs | 64 ++-------------------- zed/src/workspace.rs | 10 ++-- 3 files changed, 47 insertions(+), 151 deletions(-) diff --git a/zed/src/rpc_client.rs b/zed/src/rpc_client.rs index 122a36b600f96ba9c3be5bd4ef2830d16b3bb874..ede3f69b32eba81af3ab5583e87f290e67a6da34 100644 --- a/zed/src/rpc_client.rs +++ b/zed/src/rpc_client.rs @@ -1,5 +1,8 @@ use anyhow::{anyhow, Result}; -use futures::future::Either; +use futures::{ + future::{BoxFuture, Either}, + FutureExt, +}; use postage::{ barrier, mpsc, oneshot, prelude::{Sink, Stream}, @@ -31,67 +34,27 @@ struct RpcConnection { } type MessageHandler = - Box, ConnectionId) -> Option>; - -struct ErasedMessage { - id: u32, - connection_id: ConnectionId, - body: proto::Envelope, -} - -pub struct Message { - connection_id: ConnectionId, - body: Option, -} - -impl From for Message { - fn from(message: ErasedMessage) -> Self { - Self { - connection_id: message.connection_id, - body: T::from_envelope(message.body), - } - } -} - -impl Message { - pub fn connection_id(&self) -> ConnectionId { - self.connection_id - } - - pub fn body(&mut self) -> T { - self.body.take().expect("body already taken") - } -} + Box, ConnectionId) -> Option>>; -pub struct Request { +pub struct TypedEnvelope { id: u32, connection_id: ConnectionId, - body: Option, + payload: T, } -impl From for Request { - fn from(message: ErasedMessage) -> Self { - Self { - id: message.id, - connection_id: message.connection_id, - body: T::from_envelope(message.body), - } - } -} - -impl Request { +impl TypedEnvelope { pub fn connection_id(&self) -> ConnectionId { self.connection_id } - pub fn body(&mut self) -> T { - self.body.take().expect("body already taken") + pub fn payload(&self) -> &T { + &self.payload } } pub struct RpcClient { connections: RwLock>>, - message_handlers: RwLock, MessageHandler)>>, + message_handlers: RwLock>, handler_types: Mutex>, next_connection_id: AtomicU32, } @@ -106,52 +69,37 @@ impl RpcClient { }) } - pub async fn add_request_handler(&self) -> impl Stream> { + pub async fn add_message_handler( + &self, + ) -> mpsc::Receiver> { if !self.handler_types.lock().await.insert(TypeId::of::()) { panic!("duplicate handler type"); } let (tx, rx) = mpsc::channel(256); - self.message_handlers.write().await.push(( - tx, - Box::new(move |envelope, connection_id| { - if envelope.as_ref().map_or(false, T::matches_envelope) { - let envelope = Option::take(envelope).unwrap(); - Some(ErasedMessage { - id: envelope.id, - connection_id, - body: envelope, - }) - } else { - None - } - }), - )); - rx.map(Request::from) - } - - pub async fn add_message_handler(&self) -> impl Stream> { - if !self.handler_types.lock().await.insert(TypeId::of::()) { - panic!("duplicate handler type"); - } - - let (tx, rx) = mpsc::channel(256); - self.message_handlers.write().await.push(( - tx, - Box::new(move |envelope, connection_id| { + self.message_handlers + .write() + .await + .push(Box::new(move |envelope, connection_id| { if envelope.as_ref().map_or(false, T::matches_envelope) { let envelope = Option::take(envelope).unwrap(); - Some(ErasedMessage { - id: envelope.id, - connection_id, - body: envelope, - }) + let mut tx = tx.clone(); + Some( + async move { + tx.send(TypedEnvelope { + id: envelope.id, + connection_id, + payload: T::from_envelope(envelope).unwrap(), + }) + .await; + } + .boxed(), + ) } else { None } - }), - )); - rx.map(Message::from) + })); + rx } pub async fn add_connection( @@ -208,9 +156,9 @@ impl RpcClient { } else { let mut handled = false; let mut envelope = Some(incoming); - for (tx, handler) in this.message_handlers.read().await.iter() { - if let Some(message) = handler(&mut envelope, connection_id) { - let _ = tx.clone().send(message).await; + for handler in this.message_handlers.read().await.iter() { + if let Some(future) = handler(&mut envelope, connection_id) { + future.await; handled = true; break; } @@ -303,7 +251,7 @@ impl RpcClient { pub fn respond( self: &Arc, - request: Request, + request: TypedEnvelope, response: T::Response, ) -> impl Future> { let this = self.clone(); diff --git a/zed/src/util.rs b/zed/src/util.rs index 643c07fb70e6c34db710d1b7ae72f1e10b62fb13..6b39bda341cc87468384a4b0c0a80c2f07dba374 100644 --- a/zed/src/util.rs +++ b/zed/src/util.rs @@ -1,4 +1,4 @@ -use crate::rpc_client::{Message, Request, RpcClient}; +use crate::rpc_client::{RpcClient, TypedEnvelope}; use postage::prelude::Stream; use rand::prelude::*; use std::{cmp::Ordering, future::Future, sync::Arc}; @@ -56,41 +56,12 @@ where } } -pub trait RequestHandler<'a, R: proto::RequestMessage> { - type Output: 'a + Future>; - - fn handle( - &self, - request: Request, - client: Arc, - cx: &'a mut gpui::AsyncAppContext, - ) -> Self::Output; -} - -impl<'a, R, F, Fut> RequestHandler<'a, R> for F -where - R: proto::RequestMessage, - F: Fn(Request, Arc, &'a mut gpui::AsyncAppContext) -> Fut, - Fut: 'a + Future>, -{ - type Output = Fut; - - fn handle( - &self, - request: Request, - client: Arc, - cx: &'a mut gpui::AsyncAppContext, - ) -> Self::Output { - (self)(request, client, cx) - } -} - pub trait MessageHandler<'a, M: proto::EnvelopedMessage> { type Output: 'a + Future>; fn handle( &self, - message: Message, + message: TypedEnvelope, client: Arc, cx: &'a mut gpui::AsyncAppContext, ) -> Self::Output; @@ -99,14 +70,14 @@ pub trait MessageHandler<'a, M: proto::EnvelopedMessage> { impl<'a, M, F, Fut> MessageHandler<'a, M> for F where M: proto::EnvelopedMessage, - F: Fn(Message, Arc, &'a mut gpui::AsyncAppContext) -> Fut, + F: Fn(TypedEnvelope, Arc, &'a mut gpui::AsyncAppContext) -> Fut, Fut: 'a + Future>, { type Output = Fut; fn handle( &self, - message: Message, + message: TypedEnvelope, client: Arc, cx: &'a mut gpui::AsyncAppContext, ) -> Self::Output { @@ -114,31 +85,8 @@ where } } -pub fn spawn_request_handler( - handler: H, - client: &Arc, - cx: &mut gpui::MutableAppContext, -) where - H: 'static + for<'a> RequestHandler<'a, R>, - R: proto::RequestMessage, -{ - let client = client.clone(); - let mut requests = smol::block_on(client.add_request_handler::()); - cx.spawn(|mut cx| async move { - while let Some(request) = requests.recv().await { - if let Err(err) = handler.handle(request, client.clone(), &mut cx).await { - log::error!("error handling request: {:?}", err); - } - } - }) - .detach(); -} - -pub fn spawn_message_handler( - handler: H, - client: &Arc, - cx: &mut gpui::MutableAppContext, -) where +pub fn handle_messages(handler: H, client: &Arc, cx: &mut gpui::MutableAppContext) +where H: 'static + for<'a> MessageHandler<'a, M>, M: proto::EnvelopedMessage, { diff --git a/zed/src/workspace.rs b/zed/src/workspace.rs index d053488ac99322a21b6143185489579f7af676e9..2ad38ceb7fd1aad031f7cc58dba5fb11908c5a20 100644 --- a/zed/src/workspace.rs +++ b/zed/src/workspace.rs @@ -4,7 +4,7 @@ pub mod pane_group; use crate::{ editor::{Buffer, Editor}, language::LanguageRegistry, - rpc_client::{Request, RpcClient}, + rpc_client::{RpcClient, TypedEnvelope}, settings::Settings, time::ReplicaId, util::{self, SurfResultExt as _}, @@ -46,7 +46,7 @@ pub fn init(cx: &mut MutableAppContext, rpc_client: Arc) { ]); pane::init(cx); - util::spawn_request_handler(handle_open_buffer, &rpc_client, cx); + util::handle_messages(handle_open_buffer, &rpc_client, cx); } pub struct OpenParams { @@ -109,12 +109,12 @@ fn open_paths(params: &OpenParams, cx: &mut MutableAppContext) { } async fn handle_open_buffer( - mut request: Request, + request: TypedEnvelope, rpc_client: Arc, cx: &mut AsyncAppContext, ) -> anyhow::Result<()> { - let body = request.body(); - dbg!(body.path); + let payload = request.payload(); + dbg!(&payload.path); rpc_client .respond(request, proto::OpenBufferResponse { buffer: None }) .await?;