diff --git a/zed/src/rpc_client.rs b/zed/src/rpc_client.rs index c4f1c45f303ed57061831ae764ce2584c425eb75..5cc257a209c7dc499bfd926b86336f0e557a85a3 100644 --- a/zed/src/rpc_client.rs +++ b/zed/src/rpc_client.rs @@ -6,7 +6,7 @@ use postage::{ prelude::{Sink, Stream}, }; use smol::{ - io::{ReadHalf, WriteHalf}, + io::{BoxedWriter, ReadHalf}, lock::Mutex, prelude::{AsyncRead, AsyncWrite}, }; @@ -22,18 +22,18 @@ use zed_rpc::proto::{ self, MessageStream, RequestMessage, SendMessage, ServerMessage, SubscribeMessage, }; -pub struct RpcClient { +pub struct RpcClient { response_channels: Arc, bool)>>>, - outgoing: Mutex>>, + outgoing: Mutex>, next_message_id: AtomicI32, _drop_tx: barrier::Sender, } -impl RpcClient -where - Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - pub fn new(conn: Conn, executor: Arc) -> Arc { +impl RpcClient { + pub fn new(conn: Conn, executor: Arc) -> Arc + where + Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { let response_channels = Arc::new(Mutex::new(HashMap::new())); let (conn_rx, conn_tx) = smol::io::split(conn); let (_drop_tx, drop_rx) = barrier::channel(); @@ -48,19 +48,21 @@ where Arc::new(Self { response_channels, - outgoing: Mutex::new(MessageStream::new(conn_tx)), + outgoing: Mutex::new(MessageStream::new(Box::pin(conn_tx))), _drop_tx, next_message_id: AtomicI32::new(0), }) } - async fn handle_incoming( + async fn handle_incoming( conn: ReadHalf, mut drop_rx: barrier::Receiver, response_channels: Arc< Mutex, bool)>>, >, - ) { + ) where + Conn: AsyncRead + Unpin, + { let dropped = drop_rx.recv(); smol::pin!(dropped);