From 8bfee93be43eccc534f9c0de08db795589938e7f Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 22 Sep 2021 11:58:23 -0700 Subject: [PATCH 1/3] Start work on compressing RPC messages Co-Authored-By: Antonio Scandurra --- Cargo.lock | 42 ++++++++++++++++++++++++++++++++++++++++++ zed/src/worktree.rs | 16 ++++++++++++---- zrpc/Cargo.toml | 1 + zrpc/src/proto.rs | 32 ++++++++++++++++++++++++++++---- 4 files changed, 83 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7973316c9e337532685bfc546f22dae9d8a7bee1..483cf758880de306df5a599a61353769ced2f43c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -867,6 +867,9 @@ name = "cc" version = "1.0.67" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd" +dependencies = [ + "jobserver", +] [[package]] name = "cexpr" @@ -2614,6 +2617,15 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" +[[package]] +name = "jobserver" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa" +dependencies = [ + "libc", +] + [[package]] name = "jpeg-decoder" version = "0.1.22" @@ -6037,4 +6049,34 @@ dependencies = [ "serde 1.0.125", "smol", "tempdir", + "zstd", +] + +[[package]] +name = "zstd" +version = "0.9.0+zstd.1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07749a5dc2cb6b36661290245e350f15ec3bbb304e493db54a1d354480522ccd" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "4.1.1+zstd.1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c91c90f2c593b003603e5e0493c837088df4469da25aafff8bce42ba48caf079" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "1.6.1+zstd.1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "615120c7a2431d16cf1cf979e7fc31ba7a5b5e5707b29c8a99e5dbf8a8392a33" +dependencies = [ + "cc", + "libc", ] diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index 15cfd29463747ccc4e0b25b50f7cb6fc014f9cb2..52def5ec275bc7852161ab54769d98317bc51beb 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -42,7 +42,7 @@ use std::{ atomic::{AtomicUsize, Ordering::SeqCst}, Arc, }, - time::{Duration, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use zrpc::{PeerId, TypedEnvelope}; @@ -1072,9 +1072,14 @@ impl LocalWorktree { }; let remote_id = share_request.worktree.as_ref().unwrap().id; + let t0 = Instant::now(); let share_response = rpc.request(share_request).await?; - log::info!("sharing worktree {:?}", share_response); + log::info!( + "sharing worktree {:?} - took {:?}", + share_response, + t0.elapsed() + ); let (snapshots_to_send_tx, snapshots_to_send_rx) = smol::channel::unbounded::(); @@ -1137,7 +1142,8 @@ impl LocalWorktree { let snapshot = self.snapshot(); let root_name = self.root_name.clone(); cx.background().spawn(async move { - remote_id.await.map(|id| { + let t0 = Instant::now(); + let result = remote_id.await.map(|id| { let entries = snapshot .entries_by_path .cursor::<(), ()>() @@ -1151,7 +1157,9 @@ impl LocalWorktree { entries, }), } - }) + }); + eprintln!("computing share request: {:?}", t0.elapsed()); + result }) } } diff --git a/zrpc/Cargo.toml b/zrpc/Cargo.toml index 5d78f46bb13591ccde8ea93633ea386026596c62..ee1b1433daad18c1f77a98c1b2db554173b399bd 100644 --- a/zrpc/Cargo.toml +++ b/zrpc/Cargo.toml @@ -20,6 +20,7 @@ prost = "0.7" rand = "0.8" rsa = "0.4" serde = { version = "1", features = ["derive"] } +zstd = "0.9" [build-dependencies] prost-build = { git = "https://github.com/tokio-rs/prost", rev = "6cf97ea422b09d98de34643c4dda2d4f8b7e23e6" } diff --git a/zrpc/src/proto.rs b/zrpc/src/proto.rs index 92fca53e28335680f2cf7227e0eea32a68a54e8b..9a36454abb9b9224e2b8d39e0c0efc641017fb2c 100644 --- a/zrpc/src/proto.rs +++ b/zrpc/src/proto.rs @@ -4,6 +4,7 @@ use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSock use futures::{SinkExt as _, StreamExt as _}; use prost::Message; use std::any::{Any, TypeId}; +use std::time::Instant; use std::{ io, time::{Duration, SystemTime, UNIX_EPOCH}, @@ -192,11 +193,15 @@ entity_messages!(channel_id, ChannelMessageSent); /// A stream of protobuf messages. pub struct MessageStream { stream: S, + encoding_buffer: Vec, } impl MessageStream { pub fn new(stream: S) -> Self { - Self { stream } + Self { + stream, + encoding_buffer: Vec::new(), + } } pub fn inner_mut(&mut self) -> &mut S { @@ -210,10 +215,19 @@ where { /// Write a given protobuf message to the stream. pub async fn write_message(&mut self, message: &Envelope) -> Result<(), WebSocketError> { - let mut buffer = Vec::with_capacity(message.encoded_len()); + self.encoding_buffer.resize(message.encoded_len(), 0); + self.encoding_buffer.clear(); message - .encode(&mut buffer) + .encode(&mut self.encoding_buffer) .map_err(|err| io::Error::from(err))?; + let t0 = Instant::now(); + let buffer = zstd::stream::encode_all(self.encoding_buffer.as_slice(), 4).unwrap(); + eprintln!( + "write_message. len: {}, compressed_len: {}, compression time: {:?}", + message.encoded_len(), + buffer.len(), + t0.elapsed(), + ); self.stream.send(WebSocketMessage::Binary(buffer)).await?; Ok(()) } @@ -228,7 +242,17 @@ where while let Some(bytes) = self.stream.next().await { match bytes? { WebSocketMessage::Binary(bytes) => { - let envelope = Envelope::decode(bytes.as_slice()).map_err(io::Error::from)?; + let t0 = Instant::now(); + self.encoding_buffer.clear(); + zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap(); + eprintln!( + "read_message. len: {}, compressed_len: {}, decompression time: {:?}", + self.encoding_buffer.len(), + bytes.len(), + t0.elapsed() + ); + let envelope = Envelope::decode(self.encoding_buffer.as_slice()) + .map_err(io::Error::from)?; return Ok(envelope); } WebSocketMessage::Close(_) => break, From 5b40dcaeedc58c3eba3095d71b8fd12a3af17deb Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 23 Sep 2021 16:54:42 +0200 Subject: [PATCH 2/3] Remove stray logging --- zed/src/worktree.rs | 16 ++++------------ zrpc/src/proto.rs | 15 --------------- 2 files changed, 4 insertions(+), 27 deletions(-) diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index 52def5ec275bc7852161ab54769d98317bc51beb..15cfd29463747ccc4e0b25b50f7cb6fc014f9cb2 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -42,7 +42,7 @@ use std::{ atomic::{AtomicUsize, Ordering::SeqCst}, Arc, }, - time::{Duration, Instant, SystemTime}, + time::{Duration, SystemTime}, }; use zrpc::{PeerId, TypedEnvelope}; @@ -1072,14 +1072,9 @@ impl LocalWorktree { }; let remote_id = share_request.worktree.as_ref().unwrap().id; - let t0 = Instant::now(); let share_response = rpc.request(share_request).await?; - log::info!( - "sharing worktree {:?} - took {:?}", - share_response, - t0.elapsed() - ); + log::info!("sharing worktree {:?}", share_response); let (snapshots_to_send_tx, snapshots_to_send_rx) = smol::channel::unbounded::(); @@ -1142,8 +1137,7 @@ impl LocalWorktree { let snapshot = self.snapshot(); let root_name = self.root_name.clone(); cx.background().spawn(async move { - let t0 = Instant::now(); - let result = remote_id.await.map(|id| { + remote_id.await.map(|id| { let entries = snapshot .entries_by_path .cursor::<(), ()>() @@ -1157,9 +1151,7 @@ impl LocalWorktree { entries, }), } - }); - eprintln!("computing share request: {:?}", t0.elapsed()); - result + }) }) } } diff --git a/zrpc/src/proto.rs b/zrpc/src/proto.rs index 9a36454abb9b9224e2b8d39e0c0efc641017fb2c..e9de319a1c6abf8c23efcbfd3e47622d133908dc 100644 --- a/zrpc/src/proto.rs +++ b/zrpc/src/proto.rs @@ -4,7 +4,6 @@ use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSock use futures::{SinkExt as _, StreamExt as _}; use prost::Message; use std::any::{Any, TypeId}; -use std::time::Instant; use std::{ io, time::{Duration, SystemTime, UNIX_EPOCH}, @@ -220,14 +219,7 @@ where message .encode(&mut self.encoding_buffer) .map_err(|err| io::Error::from(err))?; - let t0 = Instant::now(); let buffer = zstd::stream::encode_all(self.encoding_buffer.as_slice(), 4).unwrap(); - eprintln!( - "write_message. len: {}, compressed_len: {}, compression time: {:?}", - message.encoded_len(), - buffer.len(), - t0.elapsed(), - ); self.stream.send(WebSocketMessage::Binary(buffer)).await?; Ok(()) } @@ -242,15 +234,8 @@ where while let Some(bytes) = self.stream.next().await { match bytes? { WebSocketMessage::Binary(bytes) => { - let t0 = Instant::now(); self.encoding_buffer.clear(); zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap(); - eprintln!( - "read_message. len: {}, compressed_len: {}, decompression time: {:?}", - self.encoding_buffer.len(), - bytes.len(), - t0.elapsed() - ); let envelope = Envelope::decode(self.encoding_buffer.as_slice()) .map_err(io::Error::from)?; return Ok(envelope); From 96961a7dfe2778f6e6096346647fa1fb972bccc5 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 23 Sep 2021 16:55:09 +0200 Subject: [PATCH 3/3] Bump zrpc version to 1 --- zrpc/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zrpc/src/lib.rs b/zrpc/src/lib.rs index 7e32da83c56c4531002a9a662092ab0caa093d2b..ccaf50135003576fd98c43cf26aff4e5202e7621 100644 --- a/zrpc/src/lib.rs +++ b/zrpc/src/lib.rs @@ -5,4 +5,4 @@ pub mod proto; pub use conn::Connection; pub use peer::*; -pub const PROTOCOL_VERSION: u32 = 0; +pub const PROTOCOL_VERSION: u32 = 1;