Merge pull request #1557 from zed-industries/operations-hang

Nathan Sobo created

Prevent guest disconnection when opening buffers with many operations

Change summary

crates/editor/src/multi_buffer.rs |  15 +++
crates/language/src/buffer.rs     | 102 +++++++++--------------
crates/language/src/proto.rs      |  89 +++++++++++----------
crates/language/src/tests.rs      |  44 ++++++++-
crates/project/src/project.rs     | 137 ++++++++++++++++++++++++++------
crates/rpc/proto/zed.proto        |  20 ++-
crates/rpc/src/rpc.rs             |   2 
crates/text/src/text.rs           |   8 
8 files changed, 261 insertions(+), 156 deletions(-)

Detailed changes

crates/editor/src/multi_buffer.rs 🔗

@@ -3304,8 +3304,19 @@ mod tests {
     fn test_remote_multibuffer(cx: &mut MutableAppContext) {
         let host_buffer = cx.add_model(|cx| Buffer::new(0, "a", cx));
         let guest_buffer = cx.add_model(|cx| {
-            let message = host_buffer.read(cx).to_proto();
-            Buffer::from_proto(1, message, None, cx).unwrap()
+            let state = host_buffer.read(cx).to_proto();
+            let ops = cx
+                .background()
+                .block(host_buffer.read(cx).serialize_ops(cx));
+            let mut buffer = Buffer::from_proto(1, state, None).unwrap();
+            buffer
+                .apply_ops(
+                    ops.into_iter()
+                        .map(|op| language::proto::deserialize_operation(op).unwrap()),
+                    cx,
+                )
+                .unwrap();
+            buffer
         });
         let multibuffer = cx.add_model(|cx| MultiBuffer::singleton(guest_buffer.clone(), cx));
         let snapshot = multibuffer.read(cx).snapshot(cx);

crates/language/src/buffer.rs 🔗

@@ -70,6 +70,7 @@ pub struct Buffer {
     diagnostics_timestamp: clock::Lamport,
     file_update_count: usize,
     completion_triggers: Vec<String>,
+    completion_triggers_timestamp: clock::Lamport,
     deferred_ops: OperationQueue<Operation>,
 }
 
@@ -358,9 +359,8 @@ impl Buffer {
 
     pub fn from_proto(
         replica_id: ReplicaId,
-        message: proto::Buffer,
+        message: proto::BufferState,
         file: Option<Arc<dyn File>>,
-        cx: &mut ModelContext<Self>,
     ) -> Result<Self> {
         let buffer = TextBuffer::new(replica_id, message.id, message.base_text);
         let mut this = Self::build(buffer, file);
@@ -368,74 +368,51 @@ impl Buffer {
             proto::LineEnding::from_i32(message.line_ending)
                 .ok_or_else(|| anyhow!("missing line_ending"))?,
         ));
-        let ops = message
-            .operations
-            .into_iter()
-            .map(proto::deserialize_operation)
-            .collect::<Result<Vec<_>>>()?;
-        this.apply_ops(ops, cx)?;
-
-        for selection_set in message.selections {
-            let lamport_timestamp = clock::Lamport {
-                replica_id: selection_set.replica_id as ReplicaId,
-                value: selection_set.lamport_timestamp,
-            };
-            this.remote_selections.insert(
-                selection_set.replica_id as ReplicaId,
-                SelectionSet {
-                    line_mode: selection_set.line_mode,
-                    selections: proto::deserialize_selections(selection_set.selections),
-                    lamport_timestamp,
-                },
-            );
-            this.text.lamport_clock.observe(lamport_timestamp);
-        }
-        let snapshot = this.snapshot();
-        let entries = proto::deserialize_diagnostics(message.diagnostics);
-        this.apply_diagnostic_update(
-            DiagnosticSet::from_sorted_entries(entries.iter().cloned(), &snapshot),
-            clock::Lamport {
-                replica_id: 0,
-                value: message.diagnostics_timestamp,
-            },
-            cx,
-        );
-
-        this.completion_triggers = message.completion_triggers;
-
         Ok(this)
     }
 
-    pub fn to_proto(&self) -> proto::Buffer {
-        let mut operations = self
-            .text
-            .history()
-            .map(|op| proto::serialize_operation(&Operation::Buffer(op.clone())))
-            .chain(self.deferred_ops.iter().map(proto::serialize_operation))
-            .collect::<Vec<_>>();
-        operations.sort_unstable_by_key(proto::lamport_timestamp_for_operation);
-        proto::Buffer {
+    pub fn to_proto(&self) -> proto::BufferState {
+        proto::BufferState {
             id: self.remote_id(),
             file: self.file.as_ref().map(|f| f.to_proto()),
             base_text: self.base_text().to_string(),
-            operations,
-            selections: self
-                .remote_selections
-                .iter()
-                .map(|(replica_id, set)| proto::SelectionSet {
-                    replica_id: *replica_id as u32,
-                    selections: proto::serialize_selections(&set.selections),
-                    lamport_timestamp: set.lamport_timestamp.value,
-                    line_mode: set.line_mode,
-                })
-                .collect(),
-            diagnostics: proto::serialize_diagnostics(self.diagnostics.iter()),
-            diagnostics_timestamp: self.diagnostics_timestamp.value,
-            completion_triggers: self.completion_triggers.clone(),
             line_ending: proto::serialize_line_ending(self.line_ending()) as i32,
         }
     }
 
+    pub fn serialize_ops(&self, cx: &AppContext) -> Task<Vec<proto::Operation>> {
+        let mut operations = Vec::new();
+        operations.extend(self.deferred_ops.iter().map(proto::serialize_operation));
+        operations.extend(self.remote_selections.iter().map(|(_, set)| {
+            proto::serialize_operation(&Operation::UpdateSelections {
+                selections: set.selections.clone(),
+                lamport_timestamp: set.lamport_timestamp,
+                line_mode: set.line_mode,
+            })
+        }));
+        operations.push(proto::serialize_operation(&Operation::UpdateDiagnostics {
+            diagnostics: self.diagnostics.iter().cloned().collect(),
+            lamport_timestamp: self.diagnostics_timestamp,
+        }));
+        operations.push(proto::serialize_operation(
+            &Operation::UpdateCompletionTriggers {
+                triggers: self.completion_triggers.clone(),
+                lamport_timestamp: self.completion_triggers_timestamp,
+            },
+        ));
+
+        let text_operations = self.text.operations().clone();
+        cx.background().spawn(async move {
+            operations.extend(
+                text_operations
+                    .iter()
+                    .map(|(_, op)| proto::serialize_operation(&Operation::Buffer(op.clone()))),
+            );
+            operations.sort_unstable_by_key(proto::lamport_timestamp_for_operation);
+            operations
+        })
+    }
+
     pub fn with_language(mut self, language: Arc<Language>, cx: &mut ModelContext<Self>) -> Self {
         self.set_language(Some(language), cx);
         self
@@ -470,6 +447,7 @@ impl Buffer {
             diagnostics_timestamp: Default::default(),
             file_update_count: 0,
             completion_triggers: Default::default(),
+            completion_triggers_timestamp: Default::default(),
             deferred_ops: OperationQueue::new(),
         }
     }
@@ -1517,11 +1495,11 @@ impl Buffer {
 
     pub fn set_completion_triggers(&mut self, triggers: Vec<String>, cx: &mut ModelContext<Self>) {
         self.completion_triggers = triggers.clone();
-        let lamport_timestamp = self.text.lamport_clock.tick();
+        self.completion_triggers_timestamp = self.text.lamport_clock.tick();
         self.send_operation(
             Operation::UpdateCompletionTriggers {
                 triggers,
-                lamport_timestamp,
+                lamport_timestamp: self.completion_triggers_timestamp,
             },
             cx,
         );

crates/language/src/proto.rs 🔗

@@ -1,6 +1,5 @@
 use crate::{
     diagnostic_set::DiagnosticEntry, CodeAction, CodeLabel, Completion, Diagnostic, Language,
-    Operation,
 };
 use anyhow::{anyhow, Result};
 use clock::ReplicaId;
@@ -9,7 +8,7 @@ use rpc::proto;
 use std::{ops::Range, sync::Arc};
 use text::*;
 
-pub use proto::{Buffer, LineEnding, SelectionSet};
+pub use proto::{BufferState, LineEnding, Operation, SelectionSet};
 
 pub fn deserialize_line_ending(message: proto::LineEnding) -> text::LineEnding {
     match message {
@@ -25,13 +24,13 @@ pub fn serialize_line_ending(message: text::LineEnding) -> proto::LineEnding {
     }
 }
 
-pub fn serialize_operation(operation: &Operation) -> proto::Operation {
+pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation {
     proto::Operation {
         variant: Some(match operation {
-            Operation::Buffer(text::Operation::Edit(edit)) => {
+            crate::Operation::Buffer(text::Operation::Edit(edit)) => {
                 proto::operation::Variant::Edit(serialize_edit_operation(edit))
             }
-            Operation::Buffer(text::Operation::Undo {
+            crate::Operation::Buffer(text::Operation::Undo {
                 undo,
                 lamport_timestamp,
             }) => proto::operation::Variant::Undo(proto::operation::Undo {
@@ -49,7 +48,7 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation {
                     })
                     .collect(),
             }),
-            Operation::UpdateSelections {
+            crate::Operation::UpdateSelections {
                 selections,
                 line_mode,
                 lamport_timestamp,
@@ -59,7 +58,7 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation {
                 selections: serialize_selections(selections),
                 line_mode: *line_mode,
             }),
-            Operation::UpdateDiagnostics {
+            crate::Operation::UpdateDiagnostics {
                 diagnostics,
                 lamport_timestamp,
             } => proto::operation::Variant::UpdateDiagnostics(proto::UpdateDiagnostics {
@@ -67,7 +66,7 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation {
                 lamport_timestamp: lamport_timestamp.value,
                 diagnostics: serialize_diagnostics(diagnostics.iter()),
             }),
-            Operation::UpdateCompletionTriggers {
+            crate::Operation::UpdateCompletionTriggers {
                 triggers,
                 lamport_timestamp,
             } => proto::operation::Variant::UpdateCompletionTriggers(
@@ -165,41 +164,43 @@ pub fn serialize_anchor(anchor: &Anchor) -> proto::Anchor {
     }
 }
 
-pub fn deserialize_operation(message: proto::Operation) -> Result<Operation> {
+pub fn deserialize_operation(message: proto::Operation) -> Result<crate::Operation> {
     Ok(
         match message
             .variant
             .ok_or_else(|| anyhow!("missing operation variant"))?
         {
             proto::operation::Variant::Edit(edit) => {
-                Operation::Buffer(text::Operation::Edit(deserialize_edit_operation(edit)))
+                crate::Operation::Buffer(text::Operation::Edit(deserialize_edit_operation(edit)))
             }
-            proto::operation::Variant::Undo(undo) => Operation::Buffer(text::Operation::Undo {
-                lamport_timestamp: clock::Lamport {
-                    replica_id: undo.replica_id as ReplicaId,
-                    value: undo.lamport_timestamp,
-                },
-                undo: UndoOperation {
-                    id: clock::Local {
+            proto::operation::Variant::Undo(undo) => {
+                crate::Operation::Buffer(text::Operation::Undo {
+                    lamport_timestamp: clock::Lamport {
                         replica_id: undo.replica_id as ReplicaId,
-                        value: undo.local_timestamp,
+                        value: undo.lamport_timestamp,
                     },
-                    version: deserialize_version(undo.version),
-                    counts: undo
-                        .counts
-                        .into_iter()
-                        .map(|c| {
-                            (
-                                clock::Local {
-                                    replica_id: c.replica_id as ReplicaId,
-                                    value: c.local_timestamp,
-                                },
-                                c.count,
-                            )
-                        })
-                        .collect(),
-                },
-            }),
+                    undo: UndoOperation {
+                        id: clock::Local {
+                            replica_id: undo.replica_id as ReplicaId,
+                            value: undo.local_timestamp,
+                        },
+                        version: deserialize_version(undo.version),
+                        counts: undo
+                            .counts
+                            .into_iter()
+                            .map(|c| {
+                                (
+                                    clock::Local {
+                                        replica_id: c.replica_id as ReplicaId,
+                                        value: c.local_timestamp,
+                                    },
+                                    c.count,
+                                )
+                            })
+                            .collect(),
+                    },
+                })
+            }
             proto::operation::Variant::UpdateSelections(message) => {
                 let selections = message
                     .selections
@@ -215,7 +216,7 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<Operation> {
                     })
                     .collect::<Vec<_>>();
 
-                Operation::UpdateSelections {
+                crate::Operation::UpdateSelections {
                     lamport_timestamp: clock::Lamport {
                         replica_id: message.replica_id as ReplicaId,
                         value: message.lamport_timestamp,
@@ -224,15 +225,17 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<Operation> {
                     line_mode: message.line_mode,
                 }
             }
-            proto::operation::Variant::UpdateDiagnostics(message) => Operation::UpdateDiagnostics {
-                diagnostics: deserialize_diagnostics(message.diagnostics),
-                lamport_timestamp: clock::Lamport {
-                    replica_id: message.replica_id as ReplicaId,
-                    value: message.lamport_timestamp,
-                },
-            },
+            proto::operation::Variant::UpdateDiagnostics(message) => {
+                crate::Operation::UpdateDiagnostics {
+                    diagnostics: deserialize_diagnostics(message.diagnostics),
+                    lamport_timestamp: clock::Lamport {
+                        replica_id: message.replica_id as ReplicaId,
+                        value: message.lamport_timestamp,
+                    },
+                }
+            }
             proto::operation::Variant::UpdateCompletionTriggers(message) => {
-                Operation::UpdateCompletionTriggers {
+                crate::Operation::UpdateCompletionTriggers {
                     triggers: message.triggers,
                     lamport_timestamp: clock::Lamport {
                         replica_id: message.replica_id as ReplicaId,

crates/language/src/tests.rs 🔗

@@ -2,6 +2,7 @@ use super::*;
 use clock::ReplicaId;
 use collections::BTreeMap;
 use gpui::{ModelHandle, MutableAppContext};
+use proto::deserialize_operation;
 use rand::prelude::*;
 use settings::Settings;
 use std::{
@@ -1047,8 +1048,19 @@ fn test_serialization(cx: &mut gpui::MutableAppContext) {
     });
     assert_eq!(buffer1.read(cx).text(), "abcDF");
 
-    let message = buffer1.read(cx).to_proto();
-    let buffer2 = cx.add_model(|cx| Buffer::from_proto(1, message, None, cx).unwrap());
+    let state = buffer1.read(cx).to_proto();
+    let ops = cx.background().block(buffer1.read(cx).serialize_ops(cx));
+    let buffer2 = cx.add_model(|cx| {
+        let mut buffer = Buffer::from_proto(1, state, None).unwrap();
+        buffer
+            .apply_ops(
+                ops.into_iter()
+                    .map(|op| proto::deserialize_operation(op).unwrap()),
+                cx,
+            )
+            .unwrap();
+        buffer
+    });
     assert_eq!(buffer2.read(cx).text(), "abcDF");
 }
 
@@ -1075,9 +1087,18 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
 
     for i in 0..rng.gen_range(min_peers..=max_peers) {
         let buffer = cx.add_model(|cx| {
-            let mut buffer =
-                Buffer::from_proto(i as ReplicaId, base_buffer.read(cx).to_proto(), None, cx)
-                    .unwrap();
+            let state = base_buffer.read(cx).to_proto();
+            let ops = cx
+                .background()
+                .block(base_buffer.read(cx).serialize_ops(cx));
+            let mut buffer = Buffer::from_proto(i as ReplicaId, state, None).unwrap();
+            buffer
+                .apply_ops(
+                    ops.into_iter()
+                        .map(|op| proto::deserialize_operation(op).unwrap()),
+                    cx,
+                )
+                .unwrap();
             buffer.set_group_interval(Duration::from_millis(rng.gen_range(0..=200)));
             let network = network.clone();
             cx.subscribe(&cx.handle(), move |buffer, _, event, _| {
@@ -1164,7 +1185,8 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
                 mutation_count -= 1;
             }
             50..=59 if replica_ids.len() < max_peers => {
-                let old_buffer = buffer.read(cx).to_proto();
+                let old_buffer_state = buffer.read(cx).to_proto();
+                let old_buffer_ops = cx.background().block(buffer.read(cx).serialize_ops(cx));
                 let new_replica_id = (0..=replica_ids.len() as ReplicaId)
                     .filter(|replica_id| *replica_id != buffer.read(cx).replica_id())
                     .choose(&mut rng)
@@ -1176,7 +1198,15 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
                 );
                 new_buffer = Some(cx.add_model(|cx| {
                     let mut new_buffer =
-                        Buffer::from_proto(new_replica_id, old_buffer, None, cx).unwrap();
+                        Buffer::from_proto(new_replica_id, old_buffer_state, None).unwrap();
+                    new_buffer
+                        .apply_ops(
+                            old_buffer_ops
+                                .into_iter()
+                                .map(|op| deserialize_operation(op).unwrap()),
+                            cx,
+                        )
+                        .unwrap();
                     log::info!(
                         "New replica {} text: {:?}",
                         new_buffer.replica_id(),

crates/project/src/project.rs 🔗

@@ -123,6 +123,7 @@ pub struct Project {
     loading_local_worktrees:
         HashMap<Arc<Path>, Shared<Task<Result<ModelHandle<Worktree>, Arc<anyhow::Error>>>>>,
     opened_buffers: HashMap<u64, OpenBuffer>,
+    incomplete_buffers: HashMap<u64, ModelHandle<Buffer>>,
     buffer_snapshots: HashMap<u64, Vec<(i32, TextBufferSnapshot)>>,
     nonce: u128,
     initialized_persistent_state: bool,
@@ -144,7 +145,7 @@ pub enum JoinProjectError {
 enum OpenBuffer {
     Strong(ModelHandle<Buffer>),
     Weak(WeakModelHandle<Buffer>),
-    Loading(Vec<Operation>),
+    Operations(Vec<Operation>),
 }
 
 enum WorktreeHandle {
@@ -461,6 +462,7 @@ impl Project {
                 collaborators: Default::default(),
                 opened_buffers: Default::default(),
                 shared_buffers: Default::default(),
+                incomplete_buffers: Default::default(),
                 loading_buffers: Default::default(),
                 loading_local_worktrees: Default::default(),
                 buffer_snapshots: Default::default(),
@@ -550,6 +552,7 @@ impl Project {
                 loading_buffers: Default::default(),
                 opened_buffer: watch::channel(),
                 shared_buffers: Default::default(),
+                incomplete_buffers: Default::default(),
                 loading_local_worktrees: Default::default(),
                 active_entry: None,
                 collaborators: Default::default(),
@@ -1331,7 +1334,7 @@ impl Project {
                         *open_buffer = OpenBuffer::Strong(buffer);
                     }
                 }
-                OpenBuffer::Loading(_) => unreachable!(),
+                OpenBuffer::Operations(_) => unreachable!(),
             }
         }
 
@@ -1456,6 +1459,10 @@ impl Project {
             }
             cx.emit(Event::DisconnectedFromHost);
             cx.notify();
+
+            // Wake up all futures currently waiting on a buffer to get opened,
+            // to give them a chance to fail now that we've disconnected.
+            *self.opened_buffer.0.borrow_mut() = ();
         }
     }
 
@@ -1757,7 +1764,7 @@ impl Project {
 
         match self.opened_buffers.insert(remote_id, open_buffer) {
             None => {}
-            Some(OpenBuffer::Loading(operations)) => {
+            Some(OpenBuffer::Operations(operations)) => {
                 buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?
             }
             Some(OpenBuffer::Weak(existing_handle)) => {
@@ -5107,7 +5114,7 @@ impl Project {
                     OpenBuffer::Strong(buffer) => {
                         buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?;
                     }
-                    OpenBuffer::Loading(operations) => operations.extend_from_slice(&ops),
+                    OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops),
                     OpenBuffer::Weak(_) => {}
                 },
                 hash_map::Entry::Vacant(e) => {
@@ -5116,7 +5123,7 @@ impl Project {
                         "received buffer update from {:?}",
                         envelope.original_sender_id
                     );
-                    e.insert(OpenBuffer::Loading(ops));
+                    e.insert(OpenBuffer::Operations(ops));
                 }
             }
             Ok(())
@@ -5130,24 +5137,52 @@ impl Project {
         mut cx: AsyncAppContext,
     ) -> Result<()> {
         this.update(&mut cx, |this, cx| {
-            let mut buffer = envelope
+            match envelope
                 .payload
-                .buffer
-                .ok_or_else(|| anyhow!("invalid buffer"))?;
-            let mut buffer_file = None;
-            if let Some(file) = buffer.file.take() {
-                let worktree_id = WorktreeId::from_proto(file.worktree_id);
-                let worktree = this
-                    .worktree_for_id(worktree_id, cx)
-                    .ok_or_else(|| anyhow!("no worktree found for id {}", file.worktree_id))?;
-                buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
-                    as Arc<dyn language::File>);
-            }
+                .variant
+                .ok_or_else(|| anyhow!("missing variant"))?
+            {
+                proto::create_buffer_for_peer::Variant::State(mut state) => {
+                    let mut buffer_file = None;
+                    if let Some(file) = state.file.take() {
+                        let worktree_id = WorktreeId::from_proto(file.worktree_id);
+                        let worktree = this.worktree_for_id(worktree_id, cx).ok_or_else(|| {
+                            anyhow!("no worktree found for id {}", file.worktree_id)
+                        })?;
+                        buffer_file = Some(Arc::new(File::from_proto(file, worktree.clone(), cx)?)
+                            as Arc<dyn language::File>);
+                    }
 
-            let buffer = cx.add_model(|cx| {
-                Buffer::from_proto(this.replica_id(), buffer, buffer_file, cx).unwrap()
-            });
-            this.register_buffer(&buffer, cx)?;
+                    let buffer_id = state.id;
+                    let buffer = cx.add_model(|_| {
+                        Buffer::from_proto(this.replica_id(), state, buffer_file).unwrap()
+                    });
+                    this.incomplete_buffers.insert(buffer_id, buffer);
+                }
+                proto::create_buffer_for_peer::Variant::Chunk(chunk) => {
+                    let buffer = this
+                        .incomplete_buffers
+                        .get(&chunk.buffer_id)
+                        .ok_or_else(|| {
+                            anyhow!(
+                                "received chunk for buffer {} without initial state",
+                                chunk.buffer_id
+                            )
+                        })?
+                        .clone();
+                    let operations = chunk
+                        .operations
+                        .into_iter()
+                        .map(language::proto::deserialize_operation)
+                        .collect::<Result<Vec<_>>>()?;
+                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
+
+                    if chunk.is_last {
+                        this.incomplete_buffers.remove(&chunk.buffer_id);
+                        this.register_buffer(&buffer, cx)?;
+                    }
+                }
+            }
 
             Ok(())
         })
@@ -5658,13 +5693,54 @@ impl Project {
         if let Some(project_id) = self.remote_id() {
             let shared_buffers = self.shared_buffers.entry(peer_id).or_default();
             if shared_buffers.insert(buffer_id) {
-                self.client
-                    .send(proto::CreateBufferForPeer {
-                        project_id,
-                        peer_id: peer_id.0,
-                        buffer: Some(buffer.read(cx).to_proto()),
-                    })
-                    .log_err();
+                let buffer = buffer.read(cx);
+                let state = buffer.to_proto();
+                let operations = buffer.serialize_ops(cx);
+                let client = self.client.clone();
+                cx.background()
+                    .spawn(
+                        async move {
+                            let mut operations = operations.await;
+
+                            client.send(proto::CreateBufferForPeer {
+                                project_id,
+                                peer_id: peer_id.0,
+                                variant: Some(proto::create_buffer_for_peer::Variant::State(state)),
+                            })?;
+
+                            loop {
+                                #[cfg(any(test, feature = "test-support"))]
+                                const CHUNK_SIZE: usize = 5;
+
+                                #[cfg(not(any(test, feature = "test-support")))]
+                                const CHUNK_SIZE: usize = 100;
+
+                                let chunk = operations
+                                    .drain(..cmp::min(CHUNK_SIZE, operations.len()))
+                                    .collect();
+                                let is_last = operations.is_empty();
+                                client.send(proto::CreateBufferForPeer {
+                                    project_id,
+                                    peer_id: peer_id.0,
+                                    variant: Some(proto::create_buffer_for_peer::Variant::Chunk(
+                                        proto::BufferChunk {
+                                            buffer_id,
+                                            operations: chunk,
+                                            is_last,
+                                        },
+                                    )),
+                                })?;
+
+                                if is_last {
+                                    break;
+                                }
+                            }
+
+                            Ok(())
+                        }
+                        .log_err(),
+                    )
+                    .detach();
             }
         }
 
@@ -5686,7 +5762,10 @@ impl Project {
                 });
                 if let Some(buffer) = buffer {
                     break buffer;
+                } else if this.read_with(&cx, |this, _| this.is_read_only()) {
+                    return Err(anyhow!("disconnected before buffer {} could be opened", id));
                 }
+
                 opened_buffer_rx
                     .next()
                     .await
@@ -6026,7 +6105,7 @@ impl OpenBuffer {
         match self {
             OpenBuffer::Strong(handle) => Some(handle.clone()),
             OpenBuffer::Weak(handle) => handle.upgrade(cx),
-            OpenBuffer::Loading(_) => None,
+            OpenBuffer::Operations(_) => None,
         }
     }
 }

crates/rpc/proto/zed.proto 🔗

@@ -370,7 +370,10 @@ message OpenBufferResponse {
 message CreateBufferForPeer {
     uint64 project_id = 1;
     uint32 peer_id = 2;
-    Buffer buffer = 3;
+    oneof variant {
+        BufferState state = 3;
+        BufferChunk chunk = 4;
+    }
 }
 
 message UpdateBuffer {
@@ -808,16 +811,17 @@ message Entry {
     bool is_ignored = 7;
 }
 
-message Buffer {
+message BufferState {
     uint64 id = 1;
     optional File file = 2;
     string base_text = 3;
-    repeated Operation operations = 4;
-    repeated SelectionSet selections = 5;
-    repeated Diagnostic diagnostics = 6;
-    uint32 diagnostics_timestamp = 7;
-    repeated string completion_triggers = 8;
-    LineEnding line_ending = 9;
+    LineEnding line_ending = 4;
+}
+
+message BufferChunk {
+    uint64 buffer_id = 1;
+    repeated Operation operations = 2;
+    bool is_last = 3;
 }
 
 enum LineEnding {

crates/rpc/src/rpc.rs 🔗

@@ -6,4 +6,4 @@ pub use conn::Connection;
 pub use peer::*;
 mod macros;
 
-pub const PROTOCOL_VERSION: u32 = 30;
+pub const PROTOCOL_VERSION: u32 = 31;

crates/text/src/text.rs 🔗

@@ -45,7 +45,7 @@ use std::{
 };
 pub use subscription::*;
 pub use sum_tree::Bias;
-use sum_tree::{FilterCursor, SumTree};
+use sum_tree::{FilterCursor, SumTree, TreeMap};
 
 lazy_static! {
     static ref CARRIAGE_RETURNS_REGEX: Regex = Regex::new("\r\n|\r").unwrap();
@@ -109,7 +109,7 @@ impl HistoryEntry {
 struct History {
     // TODO: Turn this into a String or Rope, maybe.
     base_text: Arc<str>,
-    operations: HashMap<clock::Local, Operation>,
+    operations: TreeMap<clock::Local, Operation>,
     insertion_slices: HashMap<clock::Local, Vec<InsertionSlice>>,
     undo_stack: Vec<HistoryEntry>,
     redo_stack: Vec<HistoryEntry>,
@@ -1213,8 +1213,8 @@ impl Buffer {
         &self.history.base_text
     }
 
-    pub fn history(&self) -> impl Iterator<Item = &Operation> {
-        self.history.operations.values()
+    pub fn operations(&self) -> &TreeMap<clock::Local, Operation> {
+        &self.history.operations
     }
 
     pub fn undo_history(&self) -> impl Iterator<Item = (&clock::Local, &[(clock::Local, u32)])> {