Detailed changes
@@ -867,6 +867,9 @@ name = "cc"
version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd"
+dependencies = [
+ "jobserver",
+]
[[package]]
name = "cexpr"
@@ -2614,6 +2617,15 @@ version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
+[[package]]
+name = "jobserver"
+version = "0.1.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"
+dependencies = [
+ "libc",
+]
+
[[package]]
name = "jpeg-decoder"
version = "0.1.22"
@@ -6037,4 +6049,34 @@ dependencies = [
"serde 1.0.125",
"smol",
"tempdir",
+ "zstd",
+]
+
+[[package]]
+name = "zstd"
+version = "0.9.0+zstd.1.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "07749a5dc2cb6b36661290245e350f15ec3bbb304e493db54a1d354480522ccd"
+dependencies = [
+ "zstd-safe",
+]
+
+[[package]]
+name = "zstd-safe"
+version = "4.1.1+zstd.1.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c91c90f2c593b003603e5e0493c837088df4469da25aafff8bce42ba48caf079"
+dependencies = [
+ "libc",
+ "zstd-sys",
+]
+
+[[package]]
+name = "zstd-sys"
+version = "1.6.1+zstd.1.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "615120c7a2431d16cf1cf979e7fc31ba7a5b5e5707b29c8a99e5dbf8a8392a33"
+dependencies = [
+ "cc",
+ "libc",
]
@@ -20,6 +20,7 @@ prost = "0.7"
rand = "0.8"
rsa = "0.4"
serde = { version = "1", features = ["derive"] }
+zstd = "0.9"
[build-dependencies]
prost-build = { git = "https://github.com/tokio-rs/prost", rev = "6cf97ea422b09d98de34643c4dda2d4f8b7e23e6" }
@@ -5,4 +5,4 @@ pub mod proto;
pub use conn::Connection;
pub use peer::*;
-pub const PROTOCOL_VERSION: u32 = 0;
+pub const PROTOCOL_VERSION: u32 = 1;
@@ -192,11 +192,15 @@ entity_messages!(channel_id, ChannelMessageSent);
/// A stream of protobuf messages.
pub struct MessageStream<S> {
stream: S,
+ encoding_buffer: Vec<u8>,
}
impl<S> MessageStream<S> {
pub fn new(stream: S) -> Self {
- Self { stream }
+ Self {
+ stream,
+ encoding_buffer: Vec::new(),
+ }
}
pub fn inner_mut(&mut self) -> &mut S {
@@ -210,10 +214,12 @@ where
{
/// Write a given protobuf message to the stream.
pub async fn write_message(&mut self, message: &Envelope) -> Result<(), WebSocketError> {
- let mut buffer = Vec::with_capacity(message.encoded_len());
+ self.encoding_buffer.resize(message.encoded_len(), 0);
+ self.encoding_buffer.clear();
message
- .encode(&mut buffer)
+ .encode(&mut self.encoding_buffer)
.map_err(|err| io::Error::from(err))?;
+ let buffer = zstd::stream::encode_all(self.encoding_buffer.as_slice(), 4).unwrap();
self.stream.send(WebSocketMessage::Binary(buffer)).await?;
Ok(())
}
@@ -228,7 +234,10 @@ where
while let Some(bytes) = self.stream.next().await {
match bytes? {
WebSocketMessage::Binary(bytes) => {
- let envelope = Envelope::decode(bytes.as_slice()).map_err(io::Error::from)?;
+ self.encoding_buffer.clear();
+ zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
+ let envelope = Envelope::decode(self.encoding_buffer.as_slice())
+ .map_err(io::Error::from)?;
return Ok(envelope);
}
WebSocketMessage::Close(_) => break,