Remove `postage` from `rpc`

Antonio Scandurra and Nathan Sobo created

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

Cargo.lock                |  1 -
crates/client/src/test.rs |  6 +-----
crates/rpc/Cargo.toml     |  1 -
crates/rpc/src/conn.rs    | 36 ++++++++++++++++++------------------
crates/rpc/src/peer.rs    | 34 +++++++++++++++++-----------------
crates/server/src/rpc.rs  | 13 ++++++++-----
6 files changed, 44 insertions(+), 47 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -4012,7 +4012,6 @@ dependencies = [
  "gpui",
  "log",
  "parking_lot",
- "postage",
  "prost",
  "prost-build",
  "rand 0.8.3",

crates/client/src/test.rs 🔗

@@ -6,7 +6,6 @@ use anyhow::{anyhow, Result};
 use futures::{future::BoxFuture, stream::BoxStream, Future, StreamExt};
 use gpui::{executor, ModelHandle, TestAppContext};
 use parking_lot::Mutex;
-use postage::barrier;
 use rpc::{proto, ConnectionId, Peer, Receipt, TypedEnvelope};
 use std::{fmt, rc::Rc, sync::Arc};
 
@@ -23,7 +22,6 @@ struct FakeServerState {
     connection_id: Option<ConnectionId>,
     forbid_connections: bool,
     auth_count: usize,
-    connection_killer: Option<barrier::Sender>,
     access_token: usize,
 }
 
@@ -76,15 +74,13 @@ impl FakeServer {
                             Err(EstablishConnectionError::Unauthorized)?
                         }
 
-                        let (client_conn, server_conn, kill) =
-                            Connection::in_memory(cx.background());
+                        let (client_conn, server_conn, _) = Connection::in_memory(cx.background());
                         let (connection_id, io, incoming) =
                             peer.add_test_connection(server_conn, cx.background()).await;
                         cx.background().spawn(io).detach();
                         let mut state = state.lock();
                         state.connection_id = Some(connection_id);
                         state.incoming = Some(incoming);
-                        state.connection_killer = Some(kill);
                         Ok(client_conn)
                     })
                 }

crates/rpc/Cargo.toml 🔗

@@ -23,7 +23,6 @@ base64 = "0.13"
 futures = "0.3"
 log = "0.4"
 parking_lot = "0.11.1"
-postage = { version = "0.4.1", features = ["futures-traits"] }
 prost = "0.8"
 rand = "0.8"
 rsa = "0.4"

crates/rpc/src/conn.rs 🔗

@@ -35,21 +35,24 @@ impl Connection {
     #[cfg(any(test, feature = "test-support"))]
     pub fn in_memory(
         executor: std::sync::Arc<gpui::executor::Background>,
-    ) -> (Self, Self, postage::barrier::Sender) {
-        use postage::prelude::Stream;
+    ) -> (Self, Self, std::sync::Arc<std::sync::atomic::AtomicBool>) {
+        use std::sync::{
+            atomic::{AtomicBool, Ordering::SeqCst},
+            Arc,
+        };
 
-        let (kill_tx, kill_rx) = postage::barrier::channel();
-        let (a_tx, a_rx) = channel(kill_rx.clone(), executor.clone());
-        let (b_tx, b_rx) = channel(kill_rx, executor);
+        let killed = Arc::new(AtomicBool::new(false));
+        let (a_tx, a_rx) = channel(killed.clone(), executor.clone());
+        let (b_tx, b_rx) = channel(killed.clone(), executor);
         return (
             Self { tx: a_tx, rx: b_rx },
             Self { tx: b_tx, rx: a_rx },
-            kill_tx,
+            killed,
         );
 
         fn channel(
-            kill_rx: postage::barrier::Receiver,
-            executor: std::sync::Arc<gpui::executor::Background>,
+            killed: Arc<AtomicBool>,
+            executor: Arc<gpui::executor::Background>,
         ) -> (
             Box<dyn Send + Unpin + futures::Sink<WebSocketMessage, Error = WebSocketError>>,
             Box<
@@ -57,20 +60,17 @@ impl Connection {
             >,
         ) {
             use futures::channel::mpsc;
-            use std::{
-                io::{Error, ErrorKind},
-                sync::Arc,
-            };
+            use std::io::{Error, ErrorKind};
 
             let (tx, rx) = mpsc::unbounded::<WebSocketMessage>();
 
             let tx = tx
                 .sink_map_err(|e| WebSocketError::from(Error::new(ErrorKind::Other, e)))
                 .with({
-                    let kill_rx = kill_rx.clone();
+                    let killed = killed.clone();
                     let executor = Arc::downgrade(&executor);
                     move |msg| {
-                        let mut kill_rx = kill_rx.clone();
+                        let killed = killed.clone();
                         let executor = executor.clone();
                         Box::pin(async move {
                             if let Some(executor) = executor.upgrade() {
@@ -78,7 +78,7 @@ impl Connection {
                             }
 
                             // Writes to a half-open TCP connection will error.
-                            if kill_rx.try_recv().is_ok() {
+                            if killed.load(SeqCst) {
                                 std::io::Result::Err(
                                     Error::new(ErrorKind::Other, "connection lost").into(),
                                 )?;
@@ -90,10 +90,10 @@ impl Connection {
                 });
 
             let rx = rx.then({
-                let kill_rx = kill_rx.clone();
+                let killed = killed.clone();
                 let executor = Arc::downgrade(&executor);
                 move |msg| {
-                    let mut kill_rx = kill_rx.clone();
+                    let killed = killed.clone();
                     let executor = executor.clone();
                     Box::pin(async move {
                         if let Some(executor) = executor.upgrade() {
@@ -101,7 +101,7 @@ impl Connection {
                         }
 
                         // Reads from a half-open TCP connection will hang.
-                        if kill_rx.try_recv().is_ok() {
+                        if killed.load(SeqCst) {
                             futures::future::pending::<()>().await;
                         }
 

crates/rpc/src/peer.rs 🔗

@@ -4,13 +4,13 @@ use super::{
 };
 use anyhow::{anyhow, Context, Result};
 use collections::HashMap;
-use futures::{channel::oneshot, stream::BoxStream, FutureExt as _, StreamExt};
-use parking_lot::{Mutex, RwLock};
-use postage::{
-    barrier, mpsc,
-    prelude::{Sink as _, Stream as _},
+use futures::{
+    channel::{mpsc, oneshot},
+    stream::BoxStream,
+    FutureExt, SinkExt, StreamExt,
 };
-use smol_timeout::TimeoutExt as _;
+use parking_lot::{Mutex, RwLock};
+use smol_timeout::TimeoutExt;
 use std::sync::atomic::Ordering::SeqCst;
 use std::{
     fmt,
@@ -90,10 +90,10 @@ pub struct Peer {
 
 #[derive(Clone)]
 pub struct ConnectionState {
-    outgoing_tx: futures::channel::mpsc::UnboundedSender<proto::Message>,
+    outgoing_tx: mpsc::UnboundedSender<proto::Message>,
     next_message_id: Arc<AtomicU32>,
     response_channels:
-        Arc<Mutex<Option<HashMap<u32, oneshot::Sender<(proto::Envelope, barrier::Sender)>>>>>,
+        Arc<Mutex<Option<HashMap<u32, oneshot::Sender<(proto::Envelope, oneshot::Sender<()>)>>>>>,
 }
 
 const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
@@ -127,7 +127,7 @@ impl Peer {
         // bounded channel so that other peers will receive backpressure if they send
         // messages faster than this peer can process them.
         let (mut incoming_tx, incoming_rx) = mpsc::channel(64);
-        let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded();
+        let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded();
 
         let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst));
         let connection_state = ConnectionState {
@@ -208,14 +208,14 @@ impl Peer {
                 if let Some(responding_to) = incoming.responding_to {
                     let channel = response_channels.lock().as_mut()?.remove(&responding_to);
                     if let Some(tx) = channel {
-                        let mut requester_resumed = barrier::channel();
+                        let requester_resumed = oneshot::channel();
                         if let Err(error) = tx.send((incoming, requester_resumed.0)) {
                             log::debug!(
                                 "received RPC but request future was dropped {:?}",
                                 error.0
                             );
                         }
-                        requester_resumed.1.recv().await;
+                        let _ = requester_resumed.1.await;
                     } else {
                         log::warn!("received RPC response to unknown request {}", responding_to);
                     }
@@ -721,26 +721,26 @@ mod tests {
             .add_test_connection(client_conn, cx.background())
             .await;
 
-        let (mut io_ended_tx, mut io_ended_rx) = postage::barrier::channel();
+        let (io_ended_tx, io_ended_rx) = oneshot::channel();
         executor
             .spawn(async move {
                 io_handler.await.ok();
-                io_ended_tx.send(()).await.unwrap();
+                io_ended_tx.send(()).unwrap();
             })
             .detach();
 
-        let (mut messages_ended_tx, mut messages_ended_rx) = postage::barrier::channel();
+        let (messages_ended_tx, messages_ended_rx) = oneshot::channel();
         executor
             .spawn(async move {
                 incoming.next().await;
-                messages_ended_tx.send(()).await.unwrap();
+                messages_ended_tx.send(()).unwrap();
             })
             .detach();
 
         client.disconnect(connection_id);
 
-        io_ended_rx.recv().await;
-        messages_ended_rx.recv().await;
+        let _ = io_ended_rx.await;
+        let _ = messages_ended_rx.await;
         assert!(server_conn
             .send(WebSocketMessage::Binary(vec![]))
             .await

crates/server/src/rpc.rs 🔗

@@ -1094,7 +1094,6 @@ mod tests {
     };
     use lsp::{self, FakeLanguageServer};
     use parking_lot::Mutex;
-    use postage::barrier;
     use project::{
         fs::{FakeFs, Fs as _},
         search::SearchQuery,
@@ -5350,7 +5349,7 @@ mod tests {
         server: Arc<Server>,
         foreground: Rc<executor::Foreground>,
         notifications: mpsc::UnboundedReceiver<()>,
-        connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
+        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
         forbid_connections: Arc<AtomicBool>,
         _test_db: TestDb,
     }
@@ -5418,9 +5417,9 @@ mod tests {
                                 "server is forbidding connections"
                             )))
                         } else {
-                            let (client_conn, server_conn, kill_conn) =
+                            let (client_conn, server_conn, killed) =
                                 Connection::in_memory(cx.background());
-                            connection_killers.lock().insert(user_id, kill_conn);
+                            connection_killers.lock().insert(user_id, killed);
                             cx.background()
                                 .spawn(server.handle_connection(
                                     server_conn,
@@ -5462,7 +5461,11 @@ mod tests {
         }
 
         fn disconnect_client(&self, user_id: UserId) {
-            self.connection_killers.lock().remove(&user_id);
+            self.connection_killers
+                .lock()
+                .remove(&user_id)
+                .unwrap()
+                .store(true, SeqCst);
         }
 
         fn forbid_connections(&self) {