@@ -6,7 +6,7 @@ use postage::{
prelude::{Sink, Stream},
};
use smol::{
- io::{ReadHalf, WriteHalf},
+ io::{BoxedWriter, ReadHalf},
lock::Mutex,
prelude::{AsyncRead, AsyncWrite},
};
@@ -22,18 +22,18 @@ use zed_rpc::proto::{
self, MessageStream, RequestMessage, SendMessage, ServerMessage, SubscribeMessage,
};
-pub struct RpcClient<Conn> {
+pub struct RpcClient {
response_channels: Arc<Mutex<HashMap<i32, (mpsc::Sender<proto::from_server::Variant>, bool)>>>,
- outgoing: Mutex<MessageStream<WriteHalf<Conn>>>,
+ outgoing: Mutex<MessageStream<BoxedWriter>>,
next_message_id: AtomicI32,
_drop_tx: barrier::Sender,
}
-impl<Conn> RpcClient<Conn>
-where
- Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
-{
- pub fn new(conn: Conn, executor: Arc<Background>) -> Arc<Self> {
+impl RpcClient {
+ pub fn new<Conn>(conn: Conn, executor: Arc<Background>) -> Arc<Self>
+ where
+ Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static,
+ {
let response_channels = Arc::new(Mutex::new(HashMap::new()));
let (conn_rx, conn_tx) = smol::io::split(conn);
let (_drop_tx, drop_rx) = barrier::channel();
@@ -48,19 +48,21 @@ where
Arc::new(Self {
response_channels,
- outgoing: Mutex::new(MessageStream::new(conn_tx)),
+ outgoing: Mutex::new(MessageStream::new(Box::pin(conn_tx))),
_drop_tx,
next_message_id: AtomicI32::new(0),
})
}
- async fn handle_incoming(
+ async fn handle_incoming<Conn>(
conn: ReadHalf<Conn>,
mut drop_rx: barrier::Receiver,
response_channels: Arc<
Mutex<HashMap<i32, (mpsc::Sender<proto::from_server::Variant>, bool)>>,
>,
- ) {
+ ) where
+ Conn: AsyncRead + Unpin,
+ {
let dropped = drop_rx.recv();
smol::pin!(dropped);