Start work on compressing RPC messages

Max Brunsfeld and Antonio Scandurra created

Co-Authored-By: Antonio Scandurra <me@as-cii.com>

Change summary

Cargo.lock          | 42 ++++++++++++++++++++++++++++++++++++++++++
zed/src/worktree.rs | 16 ++++++++++++----
zrpc/Cargo.toml     |  1 +
zrpc/src/proto.rs   | 32 ++++++++++++++++++++++++++++----
4 files changed, 83 insertions(+), 8 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -867,6 +867,9 @@ name = "cc"
 version = "1.0.67"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd"
+dependencies = [
+ "jobserver",
+]
 
 [[package]]
 name = "cexpr"
@@ -2614,6 +2617,15 @@ version = "0.4.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
 
+[[package]]
+name = "jobserver"
+version = "0.1.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"
+dependencies = [
+ "libc",
+]
+
 [[package]]
 name = "jpeg-decoder"
 version = "0.1.22"
@@ -6037,4 +6049,34 @@ dependencies = [
  "serde 1.0.125",
  "smol",
  "tempdir",
+ "zstd",
+]
+
+[[package]]
+name = "zstd"
+version = "0.9.0+zstd.1.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "07749a5dc2cb6b36661290245e350f15ec3bbb304e493db54a1d354480522ccd"
+dependencies = [
+ "zstd-safe",
+]
+
+[[package]]
+name = "zstd-safe"
+version = "4.1.1+zstd.1.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c91c90f2c593b003603e5e0493c837088df4469da25aafff8bce42ba48caf079"
+dependencies = [
+ "libc",
+ "zstd-sys",
+]
+
+[[package]]
+name = "zstd-sys"
+version = "1.6.1+zstd.1.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "615120c7a2431d16cf1cf979e7fc31ba7a5b5e5707b29c8a99e5dbf8a8392a33"
+dependencies = [
+ "cc",
+ "libc",
 ]

zed/src/worktree.rs 🔗

@@ -42,7 +42,7 @@ use std::{
         atomic::{AtomicUsize, Ordering::SeqCst},
         Arc,
     },
-    time::{Duration, SystemTime},
+    time::{Duration, Instant, SystemTime},
 };
 use zrpc::{PeerId, TypedEnvelope};
 
@@ -1072,9 +1072,14 @@ impl LocalWorktree {
             };
 
             let remote_id = share_request.worktree.as_ref().unwrap().id;
+            let t0 = Instant::now();
             let share_response = rpc.request(share_request).await?;
 
-            log::info!("sharing worktree {:?}", share_response);
+            log::info!(
+                "sharing worktree {:?} - took {:?}",
+                share_response,
+                t0.elapsed()
+            );
             let (snapshots_to_send_tx, snapshots_to_send_rx) =
                 smol::channel::unbounded::<Snapshot>();
 
@@ -1137,7 +1142,8 @@ impl LocalWorktree {
         let snapshot = self.snapshot();
         let root_name = self.root_name.clone();
         cx.background().spawn(async move {
-            remote_id.await.map(|id| {
+            let t0 = Instant::now();
+            let result = remote_id.await.map(|id| {
                 let entries = snapshot
                     .entries_by_path
                     .cursor::<(), ()>()
@@ -1151,7 +1157,9 @@ impl LocalWorktree {
                         entries,
                     }),
                 }
-            })
+            });
+            eprintln!("computing share request: {:?}", t0.elapsed());
+            result
         })
     }
 }

zrpc/Cargo.toml 🔗

@@ -20,6 +20,7 @@ prost = "0.7"
 rand = "0.8"
 rsa = "0.4"
 serde = { version = "1", features = ["derive"] }
+zstd = "0.9"
 
 [build-dependencies]
 prost-build = { git = "https://github.com/tokio-rs/prost", rev = "6cf97ea422b09d98de34643c4dda2d4f8b7e23e6" }

zrpc/src/proto.rs 🔗

@@ -4,6 +4,7 @@ use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSock
 use futures::{SinkExt as _, StreamExt as _};
 use prost::Message;
 use std::any::{Any, TypeId};
+use std::time::Instant;
 use std::{
     io,
     time::{Duration, SystemTime, UNIX_EPOCH},
@@ -192,11 +193,15 @@ entity_messages!(channel_id, ChannelMessageSent);
 /// A stream of protobuf messages.
 pub struct MessageStream<S> {
     stream: S,
+    encoding_buffer: Vec<u8>,
 }
 
 impl<S> MessageStream<S> {
     pub fn new(stream: S) -> Self {
-        Self { stream }
+        Self {
+            stream,
+            encoding_buffer: Vec::new(),
+        }
     }
 
     pub fn inner_mut(&mut self) -> &mut S {
@@ -210,10 +215,19 @@ where
 {
     /// Write a given protobuf message to the stream.
     pub async fn write_message(&mut self, message: &Envelope) -> Result<(), WebSocketError> {
-        let mut buffer = Vec::with_capacity(message.encoded_len());
+        self.encoding_buffer.resize(message.encoded_len(), 0);
+        self.encoding_buffer.clear();
         message
-            .encode(&mut buffer)
+            .encode(&mut self.encoding_buffer)
             .map_err(|err| io::Error::from(err))?;
+        let t0 = Instant::now();
+        let buffer = zstd::stream::encode_all(self.encoding_buffer.as_slice(), 4).unwrap();
+        eprintln!(
+            "write_message. len: {}, compressed_len: {}, compression time: {:?}",
+            message.encoded_len(),
+            buffer.len(),
+            t0.elapsed(),
+        );
         self.stream.send(WebSocketMessage::Binary(buffer)).await?;
         Ok(())
     }
@@ -228,7 +242,17 @@ where
         while let Some(bytes) = self.stream.next().await {
             match bytes? {
                 WebSocketMessage::Binary(bytes) => {
-                    let envelope = Envelope::decode(bytes.as_slice()).map_err(io::Error::from)?;
+                    let t0 = Instant::now();
+                    self.encoding_buffer.clear();
+                    zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
+                    eprintln!(
+                        "read_message. len: {}, compressed_len: {}, decompression time: {:?}",
+                        self.encoding_buffer.len(),
+                        bytes.len(),
+                        t0.elapsed()
+                    );
+                    let envelope = Envelope::decode(self.encoding_buffer.as_slice())
+                        .map_err(io::Error::from)?;
                     return Ok(envelope);
                 }
                 WebSocketMessage::Close(_) => break,