Send buffer operations in batches to reduce latency

Antonio Scandurra and Max Brunsfeld created

Co-Authored-By: Max Brunsfeld <max@zed.dev>

Change summary

crates/project/src/project.rs | 56 +++++++++++++++++++++++-------------
1 file changed, 36 insertions(+), 20 deletions(-)

Detailed changes

crates/project/src/project.rs 🔗

@@ -1733,13 +1733,19 @@ impl Project {
 
     async fn send_buffer_messages(
         this: WeakModelHandle<Self>,
-        mut rx: UnboundedReceiver<BufferMessage>,
+        rx: UnboundedReceiver<BufferMessage>,
         mut cx: AsyncAppContext,
-    ) {
+    ) -> Option<()> {
+        const MAX_BATCH_SIZE: usize = 128;
+
         let mut needs_resync_with_host = false;
-        while let Some(change) = rx.next().await {
-            if let Some(this) = this.upgrade(&mut cx) {
-                let is_local = this.read_with(&cx, |this, _| this.is_local());
+        let mut operations_by_buffer_id = HashMap::default();
+        let mut changes = rx.ready_chunks(MAX_BATCH_SIZE);
+        while let Some(changes) = changes.next().await {
+            let this = this.upgrade(&mut cx)?;
+            let is_local = this.read_with(&cx, |this, _| this.is_local());
+
+            for change in changes {
                 match change {
                     BufferMessage::Operation {
                         buffer_id,
@@ -1748,21 +1754,14 @@ impl Project {
                         if needs_resync_with_host {
                             continue;
                         }
-                        let request = this.read_with(&cx, |this, _| {
-                            let project_id = this.remote_id()?;
-                            Some(this.client.request(proto::UpdateBuffer {
-                                buffer_id,
-                                project_id,
-                                operations: vec![operation],
-                            }))
-                        });
-                        if let Some(request) = request {
-                            if request.await.is_err() && !is_local {
-                                needs_resync_with_host = true;
-                            }
-                        }
+
+                        operations_by_buffer_id
+                            .entry(buffer_id)
+                            .or_insert(Vec::new())
+                            .push(operation);
                     }
                     BufferMessage::Resync => {
+                        operations_by_buffer_id.clear();
                         if this
                             .update(&mut cx, |this, cx| this.synchronize_remote_buffers(cx))
                             .await
@@ -1772,10 +1771,27 @@ impl Project {
                         }
                     }
                 }
-            } else {
-                break;
+            }
+
+            for (buffer_id, operations) in operations_by_buffer_id.drain() {
+                let request = this.read_with(&cx, |this, _| {
+                    let project_id = this.remote_id()?;
+                    Some(this.client.request(proto::UpdateBuffer {
+                        buffer_id,
+                        project_id,
+                        operations,
+                    }))
+                });
+                if let Some(request) = request {
+                    if request.await.is_err() && !is_local {
+                        needs_resync_with_host = true;
+                        break;
+                    }
+                }
             }
         }
+
+        None
     }
 
     fn on_buffer_event(