@@ -1733,13 +1733,19 @@ impl Project {
async fn send_buffer_messages(
this: WeakModelHandle<Self>,
- mut rx: UnboundedReceiver<BufferMessage>,
+ rx: UnboundedReceiver<BufferMessage>,
mut cx: AsyncAppContext,
- ) {
+ ) -> Option<()> {
+ const MAX_BATCH_SIZE: usize = 128;
+
let mut needs_resync_with_host = false;
- while let Some(change) = rx.next().await {
- if let Some(this) = this.upgrade(&mut cx) {
- let is_local = this.read_with(&cx, |this, _| this.is_local());
+ let mut operations_by_buffer_id = HashMap::default();
+ let mut changes = rx.ready_chunks(MAX_BATCH_SIZE);
+ while let Some(changes) = changes.next().await {
+ let this = this.upgrade(&mut cx)?;
+ let is_local = this.read_with(&cx, |this, _| this.is_local());
+
+ for change in changes {
match change {
BufferMessage::Operation {
buffer_id,
@@ -1748,21 +1754,14 @@ impl Project {
if needs_resync_with_host {
continue;
}
- let request = this.read_with(&cx, |this, _| {
- let project_id = this.remote_id()?;
- Some(this.client.request(proto::UpdateBuffer {
- buffer_id,
- project_id,
- operations: vec![operation],
- }))
- });
- if let Some(request) = request {
- if request.await.is_err() && !is_local {
- needs_resync_with_host = true;
- }
- }
+
+ operations_by_buffer_id
+ .entry(buffer_id)
+ .or_insert(Vec::new())
+ .push(operation);
}
BufferMessage::Resync => {
+ operations_by_buffer_id.clear();
if this
.update(&mut cx, |this, cx| this.synchronize_remote_buffers(cx))
.await
@@ -1772,10 +1771,27 @@ impl Project {
}
}
}
- } else {
- break;
+ }
+
+ for (buffer_id, operations) in operations_by_buffer_id.drain() {
+ let request = this.read_with(&cx, |this, _| {
+ let project_id = this.remote_id()?;
+ Some(this.client.request(proto::UpdateBuffer {
+ buffer_id,
+ project_id,
+ operations,
+ }))
+ });
+ if let Some(request) = request {
+ if request.await.is_err() && !is_local {
+ needs_resync_with_host = true;
+ break;
+ }
+ }
}
}
+
+ None
}
fn on_buffer_event(