Cap `MessageStream` buffer size to 1MB

Antonio Scandurra created

We temporarily let it grow when the message size exceed the limit,
but restore the buffer's capacity shortly after. This ensures that,
for each connection in its entire lifetime, we only ever use 1MB.

Change summary

crates/auto_update/Cargo.toml    |  2 
crates/cli/Cargo.toml            |  2 
crates/collab/Cargo.toml         |  2 
crates/contacts_panel/Cargo.toml |  2 
crates/editor/Cargo.toml         |  2 
crates/gpui/Cargo.toml           |  2 
crates/language/Cargo.toml       |  2 
crates/lsp/Cargo.toml            |  2 
crates/project/Cargo.toml        |  2 
crates/rpc/Cargo.toml            |  2 
crates/rpc/src/proto.rs          | 49 +++++++++++++++++++++++++++++++--
crates/search/Cargo.toml         |  2 
crates/settings/Cargo.toml       |  2 
crates/theme/Cargo.toml          |  2 
crates/vim/Cargo.toml            |  2 
crates/workspace/Cargo.toml      |  2 
crates/zed/Cargo.toml            |  2 
17 files changed, 62 insertions(+), 19 deletions(-)

Detailed changes

crates/auto_update/Cargo.toml 🔗

@@ -17,7 +17,7 @@ anyhow = "1.0.38"
 isahc = "1.7"
 lazy_static = "1.4"
 log = "0.4"
-serde = { version = "1", features = ["derive"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
 serde_json = { version = "1.0", features = ["preserve_order"] }
 smol = "1.2.5"
 tempdir = "0.3.7"

crates/cli/Cargo.toml 🔗

@@ -16,7 +16,7 @@ anyhow = "1.0"
 clap = { version = "3.1", features = ["derive"] }
 dirs = "3.0"
 ipc-channel = "0.16"
-serde = { version = "1.0", features = ["derive"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
 
 [target.'cfg(target_os = "macos")'.dependencies]
 core-foundation = "0.9"

crates/collab/Cargo.toml 🔗

@@ -34,7 +34,7 @@ parking_lot = "0.11.1"
 rand = "0.8"
 reqwest = { version = "0.11", features = ["json"], optional = true }
 scrypt = "0.7"
-serde = { version = "1.0", features = ["derive"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
 serde_json = "1.0"
 sha-1 = "0.9"
 time = "0.2"

crates/contacts_panel/Cargo.toml 🔗

@@ -23,7 +23,7 @@ anyhow = "1.0"
 futures = "0.3"
 log = "0.4"
 postage = { version = "0.4.1", features = ["futures-traits"] }
-serde = { version = "1", features = ["derive"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
 
 [dev-dependencies]
 language = { path = "../language", features = ["test-support"] }

crates/editor/Cargo.toml 🔗

@@ -45,7 +45,7 @@ ordered-float = "2.1.1"
 parking_lot = "0.11"
 postage = { version = "0.4", features = ["futures-traits"] }
 rand = { version = "0.8.3", optional = true }
-serde = { version = "1", features = ["derive", "rc"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
 smallvec = { version = "1.6", features = ["union"] }
 smol = "1.2"
 

crates/gpui/Cargo.toml 🔗

@@ -36,7 +36,7 @@ postage = { version = "0.4.1", features = ["futures-traits"] }
 rand = "0.8.3"
 resvg = "0.14"
 seahash = "4.1"
-serde = { version = "1.0.125", features = ["derive"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
 serde_json = "1.0"
 smallvec = { version = "1.6", features = ["union"] }
 smol = "1.2"

crates/language/Cargo.toml 🔗

@@ -40,7 +40,7 @@ log = { version = "0.4.16", features = ["kv_unstable_serde"] }
 parking_lot = "0.11.1"
 postage = { version = "0.4.1", features = ["futures-traits"] }
 rand = { version = "0.8.3", optional = true }
-serde = { version = "1", features = ["derive"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
 serde_json = { version = "1", features = ["preserve_order"] }
 similar = "1.3"
 smallvec = { version = "1.6", features = ["union"] }

crates/lsp/Cargo.toml 🔗

@@ -21,7 +21,7 @@ log = { version = "0.4.16", features = ["kv_unstable_serde"] }
 lsp-types = "0.91"
 parking_lot = "0.11"
 postage = { version = "0.4.1", features = ["futures-traits"] }
-serde = { version = "1.0", features = ["derive"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
 serde_json = { version = "1.0", features = ["raw_value"] }
 smol = "1.2"
 

crates/project/Cargo.toml 🔗

@@ -40,7 +40,7 @@ parking_lot = "0.11.1"
 postage = { version = "0.4.1", features = ["futures-traits"] }
 rand = "0.8.3"
 regex = "1.5"
-serde = { version = "1", features = ["derive"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
 serde_json = { version = "1.0", features = ["preserve_order"] }
 sha2 = "0.10"
 similar = "1.3"

crates/rpc/Cargo.toml 🔗

@@ -25,7 +25,7 @@ parking_lot = "0.11.1"
 prost = "0.8"
 rand = "0.8"
 rsa = "0.4"
-serde = { version = "1", features = ["derive"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
 smol-timeout = "0.6"
 tracing = { version = "0.1.34", features = ["log"] }
 zstd = "0.9"

crates/rpc/src/proto.rs 🔗

@@ -254,6 +254,8 @@ entity_messages!(
 
 entity_messages!(channel_id, ChannelMessageSent);
 
+const MAX_BUFFER_LEN: usize = 1 * 1024 * 1024;
+
 /// A stream of protobuf messages.
 pub struct MessageStream<S> {
     stream: S,
@@ -293,14 +295,16 @@ where
 
         match message {
             Message::Envelope(message) => {
-                self.encoding_buffer.resize(message.encoded_len(), 0);
-                self.encoding_buffer.clear();
+                self.encoding_buffer.reserve(message.encoded_len());
                 message
                     .encode(&mut self.encoding_buffer)
                     .map_err(|err| io::Error::from(err))?;
                 let buffer =
                     zstd::stream::encode_all(self.encoding_buffer.as_slice(), COMPRESSION_LEVEL)
                         .unwrap();
+
+                self.encoding_buffer.clear();
+                self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
                 self.stream.send(WebSocketMessage::Binary(buffer)).await?;
             }
             Message::Ping => {
@@ -327,10 +331,12 @@ where
         while let Some(bytes) = self.stream.next().await {
             match bytes? {
                 WebSocketMessage::Binary(bytes) => {
-                    self.encoding_buffer.clear();
                     zstd::stream::copy_decode(bytes.as_slice(), &mut self.encoding_buffer).unwrap();
                     let envelope = Envelope::decode(self.encoding_buffer.as_slice())
                         .map_err(io::Error::from)?;
+
+                    self.encoding_buffer.clear();
+                    self.encoding_buffer.shrink_to(MAX_BUFFER_LEN);
                     return Ok(Message::Envelope(envelope));
                 }
                 WebSocketMessage::Ping(_) => return Ok(Message::Ping),
@@ -379,3 +385,40 @@ impl From<Nonce> for u128 {
         upper_half | lower_half
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[gpui::test]
+    async fn test_buffer_size() {
+        let (tx, rx) = futures::channel::mpsc::unbounded();
+        let mut sink = MessageStream::new(tx.sink_map_err(|_| anyhow!("")));
+        sink.write(Message::Envelope(Envelope {
+            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
+                root_name: "abcdefg".repeat(10),
+                ..Default::default()
+            })),
+            ..Default::default()
+        }))
+        .await
+        .unwrap();
+        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
+        sink.write(Message::Envelope(Envelope {
+            payload: Some(envelope::Payload::UpdateWorktree(UpdateWorktree {
+                root_name: "abcdefg".repeat(1000000),
+                ..Default::default()
+            })),
+            ..Default::default()
+        }))
+        .await
+        .unwrap();
+        assert!(sink.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
+
+        let mut stream = MessageStream::new(rx.map(|msg| anyhow::Ok(msg)));
+        stream.read().await.unwrap();
+        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
+        stream.read().await.unwrap();
+        assert!(stream.encoding_buffer.capacity() <= MAX_BUFFER_LEN);
+    }
+}

crates/search/Cargo.toml 🔗

@@ -21,7 +21,7 @@ workspace = { path = "../workspace" }
 anyhow = "1.0"
 log = { version = "0.4.16", features = ["kv_unstable_serde"] }
 postage = { version = "0.4.1", features = ["futures-traits"] }
-serde = { version = "1", features = ["derive"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
 smallvec = { version = "1.6", features = ["union"] }
 
 [dev-dependencies]

crates/settings/Cargo.toml 🔗

@@ -19,7 +19,7 @@ util = { path = "../util" }
 anyhow = "1.0.38"
 json_comments = "0.2"
 schemars = "0.8"
-serde = { version = "1", features = ["derive", "rc"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
 serde_json = { version = "1.0", features = ["preserve_order"] }
 serde_path_to_error = "0.1.4"
 toml = "0.5"

crates/theme/Cargo.toml 🔗

@@ -12,7 +12,7 @@ gpui = { path = "../gpui" }
 anyhow = "1.0.38"
 indexmap = "1.6.2"
 parking_lot = "0.11.1"
-serde = { version = "1", features = ["derive", "rc"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
 serde_json = { version = "1.0", features = ["preserve_order"] }
 serde_path_to_error = "0.1.4"
 toml = "0.5"

crates/vim/Cargo.toml 🔗

@@ -13,7 +13,7 @@ collections = { path = "../collections" }
 editor = { path = "../editor" }
 gpui = { path = "../gpui" }
 language = { path = "../language" }
-serde = { version = "1", features = ["derive"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
 settings = { path = "../settings" }
 workspace = { path = "../workspace" }
 itertools = "0.10"

crates/workspace/Cargo.toml 🔗

@@ -25,7 +25,7 @@ futures = "0.3"
 log = { version = "0.4.16", features = ["kv_unstable_serde"] }
 parking_lot = "0.11.1"
 postage = { version = "0.4.1", features = ["futures-traits"] }
-serde = { version = "1", features = ["derive", "rc"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
 serde_json = { version = "1.0", features = ["preserve_order"] }
 smallvec = { version = "1.6", features = ["union"] }
 

crates/zed/Cargo.toml 🔗

@@ -76,7 +76,7 @@ rand = "0.8.3"
 regex = "1.5"
 rsa = "0.4"
 rust-embed = { version = "6.3", features = ["include-exclude"] }
-serde = { version = "1", features = ["derive"] }
+serde = { version = "1.0", features = ["derive", "rc"] }
 serde_json = { version = "1.0", features = ["preserve_order"] }
 serde_path_to_error = "0.1.4"
 simplelog = "0.9"