Send and receive buffer operations

Max Brunsfeld and Nathan Sobo created

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

zed-rpc/proto/zed.proto  |  31 ++
zed-rpc/src/proto.rs     |   1 
zed/src/editor/buffer.rs | 465 +++++++++++++++++++++++++----------------
zed/src/worktree.rs      | 142 +++++++++---
4 files changed, 421 insertions(+), 218 deletions(-)

Detailed changes

zed-rpc/proto/zed.proto 🔗

@@ -15,7 +15,7 @@ message Envelope {
         OpenBuffer open_buffer = 10;
         OpenBufferResponse open_buffer_response = 11;
         CloseBuffer close_buffer = 12;
-        EditBuffer edit_buffer = 13;
+        UpdateBuffer update_buffer = 13;
     }
 }
 
@@ -70,7 +70,7 @@ message CloseBuffer {
     uint64 buffer_id = 2;
 }
 
-message EditBuffer {
+message UpdateBuffer {
     uint64 worktree_id = 1;
     uint64 buffer_id = 2;
     repeated Operation operations = 3;
@@ -99,13 +99,31 @@ message Entry {
 message Buffer {
     uint64 id = 1;
     string content = 2;
-    repeated Operation history = 3;
+    repeated Operation.Edit history = 3;
+}
+
+message Selection {
+    uint64 id = 1;
+    Anchor start = 2;
+    Anchor end = 3;
+    bool reversed = 4;
+}
+
+message Anchor {
+    repeated VectorClockEntry version = 1;
+    uint64 offset = 2;
+    Bias bias = 3;
+    enum Bias {
+        LEFT = 0;
+        Right = 1;
+    }
 }
 
 message Operation {
     oneof variant {
         Edit edit = 1;
         Undo undo = 2;
+        UpdateSelections update_selections = 3;
     }
 
     message Edit {
@@ -125,6 +143,13 @@ message Operation {
         uint32 edit_local_timestamp = 5;
         uint32 count = 6;
     }
+
+    message UpdateSelections {
+        uint32 replica_id = 1;
+        uint32 local_timestamp = 2;
+        uint32 lamport_timestamp = 3;
+        repeated Selection selections = 4;
+    }
 }
 
 message VectorClockEntry {

zed-rpc/src/proto.rs 🔗

@@ -73,6 +73,7 @@ request_message!(ShareWorktree, ShareWorktreeResponse);
 request_message!(OpenWorktree, OpenWorktreeResponse);
 request_message!(OpenBuffer, OpenBufferResponse);
 message!(CloseBuffer);
+message!(UpdateBuffer);
 
 /// A stream of protobuf messages.
 pub struct MessageStream<T> {

zed/src/editor/buffer.rs 🔗

@@ -16,7 +16,6 @@ use zed_rpc::proto;
 use crate::{
     language::{Language, Tree},
     operation_queue::{self, OperationQueue},
-    rpc,
     settings::{StyleId, ThemeMap},
     sum_tree::{self, FilterCursor, SumTree},
     time::{self, ReplicaId},
@@ -29,6 +28,7 @@ use lazy_static::lazy_static;
 use std::{
     cell::RefCell,
     cmp,
+    convert::{TryFrom, TryInto},
     hash::BuildHasher,
     iter::Iterator,
     ops::{Deref, DerefMut, Range},
@@ -118,7 +118,6 @@ pub struct Buffer {
     undo_map: UndoMap,
     history: History,
     file: Option<File>,
-    rpc: Option<rpc::Client>,
     language: Option<Arc<Language>>,
     syntax_tree: Mutex<Option<SyntaxTree>>,
     is_parsing: bool,
@@ -127,15 +126,13 @@ pub struct Buffer {
     deferred_ops: OperationQueue<Operation>,
     deferred_replicas: HashSet<ReplicaId>,
     replica_id: ReplicaId,
-    remote_id: Option<u64>,
+    remote_id: u64,
     local_clock: time::Local,
     lamport_clock: time::Lamport,
-    operation_callback: Option<OperationCallback>,
+    #[cfg(test)]
+    operations: Vec<Operation>,
 }
 
-type OperationCallback =
-    Box<dyn 'static + Send + Sync + FnMut(Operation, &mut ModelContext<Buffer>)>;
-
 #[derive(Clone)]
 struct SyntaxTree {
     tree: Tree,
@@ -427,8 +424,7 @@ impl Buffer {
             replica_id,
             History::new(base_text.into()),
             None,
-            None,
-            None,
+            cx.model_id() as u64,
             None,
             cx,
         )
@@ -441,15 +437,21 @@ impl Buffer {
         language: Option<Arc<Language>>,
         cx: &mut ModelContext<Self>,
     ) -> Self {
-        Self::build(replica_id, history, file, None, None, language, cx)
+        Self::build(
+            replica_id,
+            history,
+            file,
+            cx.model_id() as u64,
+            language,
+            cx,
+        )
     }
 
     fn build(
         replica_id: ReplicaId,
         history: History,
         file: Option<File>,
-        rpc: Option<rpc::Client>,
-        remote_id: Option<u64>,
+        remote_id: u64,
         language: Option<Arc<Language>>,
         cx: &mut ModelContext<Self>,
     ) -> Self {
@@ -486,7 +488,6 @@ impl Buffer {
             undo_map: Default::default(),
             history,
             file,
-            rpc,
             syntax_tree: Mutex::new(None),
             is_parsing: false,
             language,
@@ -497,9 +498,11 @@ impl Buffer {
             deferred_replicas: HashSet::default(),
             replica_id,
             remote_id,
-            operation_callback: None,
             local_clock: time::Local::new(replica_id),
             lamport_clock: time::Lamport::new(replica_id),
+
+            #[cfg(test)]
+            operations: Default::default(),
         };
         result.reparse(cx);
         result
@@ -518,7 +521,6 @@ impl Buffer {
         replica_id: ReplicaId,
         message: proto::Buffer,
         file: Option<File>,
-        rpc: rpc::Client,
         language: Option<Arc<Language>>,
         cx: &mut ModelContext<Self>,
     ) -> Result<Self> {
@@ -526,96 +528,20 @@ impl Buffer {
             replica_id,
             History::new(message.content.into()),
             file,
-            Some(rpc),
-            Some(message.id),
+            message.id,
             language,
             cx,
         );
         let ops = message
             .history
             .into_iter()
-            .filter_map(|op| op.variant)
-            .map(|op| match op {
-                proto::operation::Variant::Edit(edit) => {
-                    let mut version = time::Global::new();
-                    for entry in edit.version {
-                        version.observe(time::Local {
-                            replica_id: entry.replica_id as ReplicaId,
-                            value: entry.timestamp,
-                        });
-                    }
-                    let ranges = edit
-                        .ranges
-                        .into_iter()
-                        .map(|range| range.start as usize..range.end as usize)
-                        .collect();
-                    Operation::Edit(EditOperation {
-                        timestamp: InsertionTimestamp {
-                            replica_id: edit.replica_id as ReplicaId,
-                            local: edit.local_timestamp,
-                            lamport: edit.lamport_timestamp,
-                        },
-                        version,
-                        ranges,
-                        new_text: edit.new_text,
-                    })
-                }
-                proto::operation::Variant::Undo(undo) => Operation::Undo {
-                    lamport_timestamp: time::Lamport {
-                        replica_id: undo.replica_id as ReplicaId,
-                        value: undo.lamport_timestamp,
-                    },
-                    undo: UndoOperation {
-                        id: time::Local {
-                            replica_id: undo.replica_id as ReplicaId,
-                            value: undo.local_timestamp,
-                        },
-                        edit_id: time::Local {
-                            replica_id: undo.edit_replica_id as ReplicaId,
-                            value: undo.edit_local_timestamp,
-                        },
-                        count: undo.count,
-                    },
-                },
-            });
+            .map(|op| Operation::Edit(op.into()));
         buffer.apply_ops(ops, cx)?;
         Ok(buffer)
     }
 
     pub fn to_proto(&self, cx: &mut ModelContext<Self>) -> proto::Buffer {
-        let ops = self
-            .history
-            .ops
-            .values()
-            .map(|op| {
-                let version = op
-                    .version
-                    .iter()
-                    .map(|entry| proto::VectorClockEntry {
-                        replica_id: entry.replica_id as u32,
-                        timestamp: entry.value,
-                    })
-                    .collect();
-                let ranges = op
-                    .ranges
-                    .iter()
-                    .map(|range| proto::Range {
-                        start: range.start as u64,
-                        end: range.end as u64,
-                    })
-                    .collect();
-                proto::Operation {
-                    variant: Some(proto::operation::Variant::Edit(proto::operation::Edit {
-                        replica_id: op.timestamp.replica_id as u32,
-                        local_timestamp: op.timestamp.local,
-                        lamport_timestamp: op.timestamp.lamport,
-                        version,
-                        ranges,
-                        new_text: op.new_text.clone(),
-                    })),
-                }
-            })
-            .collect();
+        let ops = self.history.ops.values().map(Into::into).collect();
         proto::Buffer {
             id: cx.model_id() as u64,
             content: self.history.base_text.to_string(),
@@ -657,7 +583,7 @@ impl Buffer {
     ) {
         if let Some(new_file) = new_file {
             let buffer = cx.handle();
-            new_file.saved_buffer(buffer, cx.as_mut());
+            new_file.buffer_added(buffer, cx.as_mut());
             self.file = Some(new_file);
         }
         if let Some(file) = &self.file {
@@ -907,6 +833,10 @@ impl Buffer {
                 .map_or(false, |file| file.mtime(cx) > self.saved_mtime)
     }
 
+    pub fn remote_id(&self) -> u64 {
+        self.remote_id
+    }
+
     pub fn version(&self) -> time::Global {
         self.version.clone()
     }
@@ -1406,17 +1336,16 @@ impl Buffer {
         self.lamport_clock.observe(timestamp.lamport());
     }
 
+    #[cfg(not(test))]
     pub fn send_operation(&mut self, operation: Operation, cx: &mut ModelContext<Self>) {
-        if let Some(operation_callback) = self.operation_callback.as_mut() {
-            operation_callback(operation, cx);
+        if let Some(file) = &self.file {
+            file.buffer_updated(cx.handle(), operation, cx.as_mut());
         }
     }
 
-    pub fn on_operation(
-        &mut self,
-        callback: impl FnMut(Operation, &mut ModelContext<Self>) + Send + Sync + 'static,
-    ) {
-        self.operation_callback = Some(Box::new(callback));
+    #[cfg(test)]
+    pub fn send_operation(&mut self, operation: Operation, _: &mut ModelContext<Self>) {
+        self.operations.push(operation);
     }
 
     pub fn undo(&mut self, cx: &mut ModelContext<Self>) {
@@ -1821,16 +1750,17 @@ impl Clone for Buffer {
             selections_last_update: self.selections_last_update.clone(),
             deferred_ops: self.deferred_ops.clone(),
             file: self.file.clone(),
-            rpc: self.rpc.clone(),
             language: self.language.clone(),
             syntax_tree: Mutex::new(self.syntax_tree.lock().clone()),
             is_parsing: false,
             deferred_replicas: self.deferred_replicas.clone(),
             replica_id: self.replica_id,
-            operation_callback: None,
             remote_id: self.remote_id.clone(),
             local_clock: self.local_clock.clone(),
             lamport_clock: self.lamport_clock.clone(),
+
+            #[cfg(test)]
+            operations: self.operations.clone(),
         }
     }
 }
@@ -1967,22 +1897,8 @@ impl Entity for Buffer {
     type Event = Event;
 
     fn release(&mut self, cx: &mut gpui::MutableAppContext) {
-        if let (Some(buffer_id), Some(file)) = (self.remote_id, self.file.as_ref()) {
-            let rpc = self.rpc.clone().unwrap();
-            let worktree_id = file.worktree_id() as u64;
-            cx.background()
-                .spawn(async move {
-                    if let Err(error) = rpc
-                        .send(proto::CloseBuffer {
-                            worktree_id,
-                            buffer_id,
-                        })
-                        .await
-                    {
-                        log::error!("error closing remote buffer: {}", error);
-                    };
-                })
-                .detach();
+        if let Some(file) = self.file.as_ref() {
+            file.buffer_removed(self.remote_id, cx);
         }
     }
 }
@@ -2321,6 +2237,232 @@ impl Operation {
     }
 }
 
+impl<'a> Into<proto::Operation> for &'a Operation {
+    fn into(self) -> proto::Operation {
+        proto::Operation {
+            variant: Some(match self {
+                Operation::Edit(edit) => proto::operation::Variant::Edit(edit.into()),
+                Operation::Undo {
+                    undo,
+                    lamport_timestamp,
+                } => proto::operation::Variant::Undo(proto::operation::Undo {
+                    replica_id: undo.id.replica_id as u32,
+                    local_timestamp: undo.id.value,
+                    lamport_timestamp: lamport_timestamp.value,
+                    edit_replica_id: undo.edit_id.replica_id as u32,
+                    edit_local_timestamp: undo.edit_id.value,
+                    count: undo.count,
+                }),
+                Operation::UpdateSelections {
+                    set_id,
+                    selections,
+                    lamport_timestamp,
+                } => proto::operation::Variant::UpdateSelections(
+                    proto::operation::UpdateSelections {
+                        replica_id: set_id.replica_id as u32,
+                        local_timestamp: set_id.value,
+                        lamport_timestamp: lamport_timestamp.value,
+                        selections: selections.as_ref().map_or(Vec::new(), |selections| {
+                            selections
+                                .iter()
+                                .map(|selection| proto::Selection {
+                                    id: selection.id as u64,
+                                    start: Some((&selection.start).into()),
+                                    end: Some((&selection.end).into()),
+                                    reversed: selection.reversed,
+                                })
+                                .collect()
+                        }),
+                    },
+                ),
+            }),
+        }
+    }
+}
+
+impl<'a> Into<proto::operation::Edit> for &'a EditOperation {
+    fn into(self) -> proto::operation::Edit {
+        let version = self
+            .version
+            .iter()
+            .map(|entry| proto::VectorClockEntry {
+                replica_id: entry.replica_id as u32,
+                timestamp: entry.value,
+            })
+            .collect();
+        let ranges = self
+            .ranges
+            .iter()
+            .map(|range| proto::Range {
+                start: range.start as u64,
+                end: range.end as u64,
+            })
+            .collect();
+        proto::operation::Edit {
+            replica_id: self.timestamp.replica_id as u32,
+            local_timestamp: self.timestamp.local,
+            lamport_timestamp: self.timestamp.lamport,
+            version,
+            ranges,
+            new_text: self.new_text.clone(),
+        }
+    }
+}
+
+impl<'a> Into<proto::Anchor> for &'a Anchor {
+    fn into(self) -> proto::Anchor {
+        match self {
+            Anchor::Middle {
+                offset,
+                bias,
+                version,
+            } => proto::Anchor {
+                version: version
+                    .iter()
+                    .map(|entry| proto::VectorClockEntry {
+                        replica_id: entry.replica_id as u32,
+                        timestamp: entry.value,
+                    })
+                    .collect(),
+                offset: *offset as u64,
+                bias: match bias {
+                    Bias::Left => proto::anchor::Bias::Left as i32,
+                    Bias::Right => proto::anchor::Bias::Right as i32,
+                },
+            },
+            Anchor::Start => proto::Anchor {
+                version: Vec::new(),
+                bias: proto::anchor::Bias::Left as i32,
+                offset: 0,
+            },
+            Anchor::End => proto::Anchor {
+                version: Vec::new(),
+                bias: proto::anchor::Bias::Right as i32,
+                offset: u64::MAX,
+            },
+        }
+    }
+}
+
+impl TryFrom<proto::Operation> for Operation {
+    type Error = anyhow::Error;
+
+    fn try_from(message: proto::Operation) -> Result<Self, Self::Error> {
+        Ok(
+            match message
+                .variant
+                .ok_or_else(|| anyhow!("missing operation variant"))?
+            {
+                proto::operation::Variant::Edit(edit) => Operation::Edit(edit.into()),
+                proto::operation::Variant::Undo(undo) => Operation::Undo {
+                    lamport_timestamp: time::Lamport {
+                        replica_id: undo.replica_id as ReplicaId,
+                        value: undo.lamport_timestamp,
+                    },
+                    undo: UndoOperation {
+                        id: time::Local {
+                            replica_id: undo.replica_id as ReplicaId,
+                            value: undo.local_timestamp,
+                        },
+                        edit_id: time::Local {
+                            replica_id: undo.edit_replica_id as ReplicaId,
+                            value: undo.edit_local_timestamp,
+                        },
+                        count: undo.count,
+                    },
+                },
+                proto::operation::Variant::UpdateSelections(message) => {
+                    Operation::UpdateSelections {
+                        set_id: time::Lamport {
+                            replica_id: message.replica_id as ReplicaId,
+                            value: message.local_timestamp,
+                        },
+                        lamport_timestamp: time::Lamport {
+                            replica_id: message.replica_id as ReplicaId,
+                            value: message.lamport_timestamp,
+                        },
+                        selections: Some(
+                            message
+                                .selections
+                                .into_iter()
+                                .map(|selection| {
+                                    Ok(Selection {
+                                        id: selection.id as usize,
+                                        start: selection
+                                            .start
+                                            .ok_or_else(|| anyhow!("missing selection start"))?
+                                            .try_into()?,
+                                        end: selection
+                                            .end
+                                            .ok_or_else(|| anyhow!("missing selection end"))?
+                                            .try_into()?,
+                                        reversed: selection.reversed,
+                                        goal: SelectionGoal::None,
+                                    })
+                                })
+                                .collect::<Result<Vec<_>, anyhow::Error>>()?
+                                .into(),
+                        ),
+                    }
+                }
+            },
+        )
+    }
+}
+
+impl From<proto::operation::Edit> for EditOperation {
+    fn from(edit: proto::operation::Edit) -> Self {
+        let mut version = time::Global::new();
+        for entry in edit.version {
+            version.observe(time::Local {
+                replica_id: entry.replica_id as ReplicaId,
+                value: entry.timestamp,
+            });
+        }
+        let ranges = edit
+            .ranges
+            .into_iter()
+            .map(|range| range.start as usize..range.end as usize)
+            .collect();
+        EditOperation {
+            timestamp: InsertionTimestamp {
+                replica_id: edit.replica_id as ReplicaId,
+                local: edit.local_timestamp,
+                lamport: edit.lamport_timestamp,
+            },
+            version,
+            ranges,
+            new_text: edit.new_text,
+        }
+    }
+}
+
+impl TryFrom<proto::Anchor> for Anchor {
+    type Error = anyhow::Error;
+
+    fn try_from(message: proto::Anchor) -> Result<Self, Self::Error> {
+        let mut version = time::Global::new();
+        for entry in message.version {
+            version.observe(time::Local {
+                replica_id: entry.replica_id as ReplicaId,
+                value: entry.timestamp,
+            });
+        }
+
+        Ok(Self::Middle {
+            offset: message.offset as usize,
+            bias: if message.bias == proto::anchor::Bias::Left as i32 {
+                Bias::Left
+            } else if message.bias == proto::anchor::Bias::Right as i32 {
+                Bias::Right
+            } else {
+                Err(anyhow!("invalid anchor bias {}", message.bias))?
+            },
+            version,
+        })
+    }
+}
+
 impl operation_queue::Operation for Operation {
     fn timestamp(&self) -> time::Lamport {
         self.lamport_timestamp()
@@ -2419,13 +2561,7 @@ mod tests {
 
         let buffer1 = cx.add_model(|cx| Buffer::new(0, "abcdef", cx));
         let buffer2 = cx.add_model(|cx| Buffer::new(1, "abcdef", cx));
-        let buffer_ops = Arc::new(Mutex::new(Vec::new()));
-        buffer1.update(cx, |buffer, cx| {
-            buffer.on_operation({
-                let buffer_ops = buffer_ops.clone();
-                move |op, _| buffer_ops.lock().push(op)
-            });
-
+        let buffer_ops = buffer1.update(cx, |buffer, cx| {
             let buffer_1_events = buffer_1_events.clone();
             cx.subscribe(&buffer1, move |_, event, _| {
                 buffer_1_events.borrow_mut().push(event.clone())
@@ -2452,14 +2588,14 @@ mod tests {
 
             // Undoing a transaction emits one edited event.
             buffer.undo(cx);
+
+            buffer.operations.clone()
         });
 
         // Incorporating a set of remote ops emits a single edited event,
         // followed by a dirtied event.
         buffer2.update(cx, |buffer, cx| {
-            buffer
-                .apply_ops::<Vec<_>>(mem::take(buffer_ops.lock().as_mut()), cx)
-                .unwrap();
+            buffer.apply_ops(buffer_ops, cx).unwrap();
         });
 
         let buffer_1_events = buffer_1_events.borrow();
@@ -3080,22 +3216,14 @@ mod tests {
         cx.add_model(|cx| {
             let mut buffer = Buffer::new(0, "1234", cx);
 
-            let operations = Arc::new(Mutex::new(Vec::new()));
-            buffer.on_operation({
-                let edits = operations.clone();
-                move |operation, _| {
-                    edits.lock().push(operation);
-                }
-            });
-
             buffer.edit(vec![1..1], "abx", cx);
             buffer.edit(vec![3..4], "yzef", cx);
             buffer.edit(vec![3..5], "cd", cx);
             assert_eq!(buffer.text(), "1abcdef234");
 
-            let edit1 = operations.lock()[0].clone();
-            let edit2 = operations.lock()[1].clone();
-            let edit3 = operations.lock()[2].clone();
+            let edit1 = buffer.operations[0].clone();
+            let edit2 = buffer.operations[1].clone();
+            let edit3 = buffer.operations[2].clone();
 
             buffer.undo_or_redo(edit1.edit_id().unwrap(), cx).unwrap();
             assert_eq!(buffer.text(), "1cdef234");
@@ -3194,40 +3322,22 @@ mod tests {
         let buffer2 = cx.add_model(|cx| Buffer::new(2, text, cx));
         let buffer3 = cx.add_model(|cx| Buffer::new(3, text, cx));
 
-        let ops = Arc::new(Mutex::new(Vec::new()));
-
-        buffer1.update(cx, |buffer, cx| {
-            buffer.on_operation({
-                let ops = ops.clone();
-                move |operation, _| ops.lock().push(operation)
-            });
-
+        let buf1_op = buffer1.update(cx, |buffer, cx| {
             buffer.edit(vec![1..2], "12", cx);
             assert_eq!(buffer.text(), "a12cdef");
+            buffer.operations.last().unwrap().clone()
         });
-        buffer2.update(cx, |buffer, cx| {
-            buffer.on_operation({
-                let ops = ops.clone();
-                move |operation, _| ops.lock().push(operation)
-            });
-
+        let buf2_op = buffer2.update(cx, |buffer, cx| {
             buffer.edit(vec![3..4], "34", cx);
             assert_eq!(buffer.text(), "abc34ef");
+            buffer.operations.last().unwrap().clone()
         });
-        buffer3.update(cx, |buffer, cx| {
-            buffer.on_operation({
-                let ops = ops.clone();
-                move |operation, _| ops.lock().push(operation)
-            });
-
+        let buf3_op = buffer3.update(cx, |buffer, cx| {
             buffer.edit(vec![5..6], "56", cx);
             assert_eq!(buffer.text(), "abcde56");
+            buffer.operations.last().unwrap().clone()
         });
 
-        let buf1_op = ops.lock()[0].clone();
-        let buf2_op = ops.lock()[1].clone();
-        let buf3_op = ops.lock()[2].clone();
-
         buffer1.update(cx, |buffer, _| {
             buffer.apply_op(buf2_op.clone()).unwrap();
             buffer.apply_op(buf3_op.clone()).unwrap();
@@ -3265,7 +3375,6 @@ mod tests {
         for seed in start_seed..start_seed + iterations {
             dbg!(seed);
             let mut rng = StdRng::seed_from_u64(seed);
-            let network = Arc::new(Mutex::new(Network::new(StdRng::seed_from_u64(seed))));
 
             let base_text_len = rng.gen_range(0..10);
             let base_text = RandomCharIter::new(&mut rng)
@@ -3273,22 +3382,14 @@ mod tests {
                 .collect::<String>();
             let mut replica_ids = Vec::new();
             let mut buffers = Vec::new();
+            let mut network = Network::new(StdRng::seed_from_u64(seed));
+
             for i in 0..peers {
-                let buffer = cx.add_model(|cx| {
-                    let replica_id = i as ReplicaId;
-                    let mut buffer = Buffer::new(replica_id, base_text.as_str(), cx);
-                    buffer.on_operation({
-                        let network = network.clone();
-                        move |op, _| {
-                            network.lock().broadcast(replica_id, vec![op]);
-                        }
-                    });
-                    buffer
-                });
+                let buffer = cx.add_model(|cx| Buffer::new(i as ReplicaId, base_text.as_str(), cx));
 
                 buffers.push(buffer);
                 replica_ids.push(i as u16);
-                network.lock().add_peer(i as u16);
+                network.add_peer(i as u16);
             }
 
             log::info!("initial text: {:?}", base_text);
@@ -3300,15 +3401,17 @@ mod tests {
                 buffers[replica_index].update(cx, |buffer, cx| match rng.gen_range(0..=100) {
                     0..=50 if mutation_count != 0 => {
                         buffer.randomly_mutate(&mut rng, cx);
+                        network.broadcast(buffer.replica_id, mem::take(&mut buffer.operations));
                         log::info!("buffer {} text: {:?}", buffer.replica_id, buffer.text());
                         mutation_count -= 1;
                     }
                     51..=70 if mutation_count != 0 => {
                         buffer.randomly_undo_redo(&mut rng, cx);
+                        network.broadcast(buffer.replica_id, mem::take(&mut buffer.operations));
                         mutation_count -= 1;
                     }
-                    71..=100 if network.lock().has_unreceived(replica_id) => {
-                        let ops = network.lock().receive(replica_id);
+                    71..=100 if network.has_unreceived(replica_id) => {
+                        let ops = network.receive(replica_id);
                         if !ops.is_empty() {
                             log::info!(
                                 "peer {} applying {} ops from the network.",
@@ -3321,7 +3424,7 @@ mod tests {
                     _ => {}
                 });
 
-                if mutation_count == 0 && network.lock().is_idle() {
+                if mutation_count == 0 && network.is_idle() {
                     break;
                 }
             }

zed/src/worktree.rs 🔗

@@ -4,7 +4,7 @@ mod ignore;
 
 use self::{char_bag::CharBag, ignore::IgnoreStack};
 use crate::{
-    editor::{Buffer, History, Rope},
+    editor::{Buffer, History, Operation, Rope},
     language::LanguageRegistry,
     rpc::{self, proto},
     sum_tree::{self, Cursor, Edit, SumTree},
@@ -46,6 +46,7 @@ lazy_static! {
 pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) {
     rpc.on_message(remote::open_buffer, cx);
     rpc.on_message(remote::close_buffer, cx);
+    rpc.on_message(remote::update_buffer, cx);
 }
 
 #[derive(Clone, Debug)]
@@ -194,7 +195,7 @@ pub struct LocalWorktree {
     scan_state: (watch::Sender<ScanState>, watch::Receiver<ScanState>),
     _event_stream_handle: fsevent::Handle,
     poll_scheduled: bool,
-    rpc: Option<rpc::Client>,
+    rpc: Option<(rpc::Client, u64)>,
     open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
 }
 
@@ -298,22 +299,7 @@ impl LocalWorktree {
                 let language = language_registry.select_language(&path).cloned();
                 let file = File::new(handle, path.into());
                 let buffer = cx.add_model(|cx| {
-                    let mut buffer = Buffer::from_history(
-                        0,
-                        History::new(contents.into()),
-                        Some(file),
-                        language,
-                        cx,
-                    );
-                    buffer.on_operation({
-                        let worktree = handle.clone();
-                        move |operation, cx| {
-                            worktree.update(cx, |tree, cx| {
-                                // tree.buffer_changed(cx.model_id(), operation)
-                            });
-                        }
-                    });
-                    buffer
+                    Buffer::from_history(0, History::new(contents.into()), Some(file), language, cx)
                 });
                 this.update(&mut cx, |this, _| {
                     let this = this.as_local_mut().unwrap();
@@ -500,14 +486,13 @@ impl LocalWorktree {
 
     pub fn share(
         &mut self,
-        client: rpc::Client,
+        rpc: rpc::Client,
         cx: &mut ModelContext<Worktree>,
     ) -> Task<anyhow::Result<(u64, String)>> {
-        self.rpc = Some(client.clone());
         let root_name = self.root_name.clone();
         let snapshot = self.snapshot();
         let handle = cx.handle();
-        cx.spawn(|_this, cx| async move {
+        cx.spawn(|this, mut cx| async move {
             let entries = cx
                 .background()
                 .spawn(async move {
@@ -526,20 +511,23 @@ impl LocalWorktree {
                 })
                 .await;
 
-            let share_response = client
+            let share_response = rpc
                 .request(proto::ShareWorktree {
                     worktree: Some(proto::Worktree { root_name, entries }),
                 })
                 .await?;
 
-            client
-                .state
+            rpc.state
                 .lock()
                 .await
                 .shared_worktrees
                 .insert(share_response.worktree_id, handle);
 
             log::info!("sharing worktree {:?}", share_response);
+
+            this.update(&mut cx, |worktree, _| {
+                worktree.as_local_mut().unwrap().rpc = Some((rpc, share_response.worktree_id));
+            });
             Ok((share_response.worktree_id, share_response.access_token))
         })
     }
@@ -664,8 +652,7 @@ impl RemoteWorktree {
                 let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
                 let buffer_id = remote_buffer.id;
                 let buffer = cx.add_model(|cx| {
-                    Buffer::from_proto(replica_id, remote_buffer, Some(file), rpc, language, cx)
-                        .unwrap()
+                    Buffer::from_proto(replica_id, remote_buffer, Some(file), language, cx).unwrap()
                 });
                 this.update(&mut cx, |this, _| {
                     let this = this.as_remote_mut().unwrap();
@@ -936,7 +923,7 @@ impl File {
         Self { worktree, path }
     }
 
-    pub fn saved_buffer(&self, buffer: ModelHandle<Buffer>, cx: &mut MutableAppContext) {
+    pub fn buffer_added(&self, buffer: ModelHandle<Buffer>, cx: &mut MutableAppContext) {
         self.worktree.update(cx, |worktree, _| {
             if let Worktree::Local(worktree) = worktree {
                 worktree
@@ -946,6 +933,58 @@ impl File {
         })
     }
 
+    pub fn buffer_updated(
+        &self,
+        buffer: ModelHandle<Buffer>,
+        operation: Operation,
+        cx: &mut MutableAppContext,
+    ) {
+        let buffer_id = buffer.read(cx).remote_id();
+        self.worktree.update(cx, |worktree, cx| {
+            if let Some((rpc, remote_id)) = match worktree {
+                Worktree::Local(worktree) => worktree.rpc.clone(),
+                Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)),
+            } {
+                cx.background()
+                    .spawn(async move {
+                        if let Err(error) = rpc
+                            .send(proto::UpdateBuffer {
+                                worktree_id: remote_id,
+                                buffer_id,
+                                operations: Some(operation).iter().map(Into::into).collect(),
+                            })
+                            .await
+                        {
+                            log::error!("error sending buffer operation: {}", error);
+                        }
+                    })
+                    .detach();
+            }
+        });
+    }
+
+    pub fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
+        self.worktree.update(cx, |worktree, cx| {
+            if let Worktree::Remote(worktree) = worktree {
+                let worktree_id = worktree.remote_id;
+                let rpc = worktree.rpc.clone();
+                cx.background()
+                    .spawn(async move {
+                        if let Err(error) = rpc
+                            .send(proto::CloseBuffer {
+                                worktree_id,
+                                buffer_id,
+                            })
+                            .await
+                        {
+                            log::error!("error closing remote buffer: {}", error);
+                        };
+                    })
+                    .detach();
+            }
+        });
+    }
+
     /// Returns this file's path relative to the root of its worktree.
     pub fn path(&self) -> Arc<Path> {
         self.path.clone()
@@ -1751,16 +1790,18 @@ impl<'a> Iterator for ChildEntriesIter<'a> {
 }
 
 mod remote {
+    use std::convert::TryInto;
+
     use super::*;
     use crate::rpc::TypedEnvelope;
 
     pub async fn open_buffer(
-        request: TypedEnvelope<proto::OpenBuffer>,
+        envelope: TypedEnvelope<proto::OpenBuffer>,
         rpc: &rpc::Client,
         cx: &mut AsyncAppContext,
     ) -> anyhow::Result<()> {
-        let message = &request.payload;
-        let peer_id = request
+        let message = &envelope.payload;
+        let peer_id = envelope
             .original_sender_id
             .ok_or_else(|| anyhow!("missing original sender id"))?;
 
@@ -1787,7 +1828,7 @@ mod remote {
             .insert(buffer.id() as u64, buffer.clone());
 
         rpc.respond(
-            request.receipt(),
+            envelope.receipt(),
             proto::OpenBufferResponse {
                 buffer: Some(buffer.update(cx, |buf, cx| buf.to_proto(cx))),
             },
@@ -1798,20 +1839,53 @@ mod remote {
     }
 
     pub async fn close_buffer(
-        message: TypedEnvelope<proto::CloseBuffer>,
+        envelope: TypedEnvelope<proto::CloseBuffer>,
         rpc: &rpc::Client,
         _: &mut AsyncAppContext,
     ) -> anyhow::Result<()> {
-        let peer_id = message
+        let peer_id = envelope
             .original_sender_id
             .ok_or_else(|| anyhow!("missing original sender id"))?;
-        let message = &message.payload;
+        let message = &envelope.payload;
         let mut state = rpc.state.lock().await;
         state.shared_buffers.entry(peer_id).and_modify(|buffers| {
             buffers.remove(&message.buffer_id);
         });
         Ok(())
     }
+
+    pub async fn update_buffer(
+        envelope: TypedEnvelope<proto::UpdateBuffer>,
+        rpc: &rpc::Client,
+        cx: &mut AsyncAppContext,
+    ) -> anyhow::Result<()> {
+        let peer_id = envelope
+            .original_sender_id
+            .ok_or_else(|| anyhow!("missing original sender id"))?;
+        let message = envelope.payload;
+        if let Some(buffer) = rpc
+            .state
+            .lock()
+            .await
+            .shared_buffers
+            .get(&peer_id)
+            .and_then(|buffers| buffers.get(&message.buffer_id))
+            .cloned()
+        {
+            if let Err(error) = buffer.update(cx, |buffer, cx| {
+                let ops = message
+                    .operations
+                    .into_iter()
+                    .map(|op| op.try_into())
+                    .collect::<anyhow::Result<Vec<_>>>()?;
+                buffer.apply_ops(ops, cx)?;
+                Ok::<(), anyhow::Error>(())
+            }) {
+                log::error!("error applying buffer operations {}", error);
+            }
+        }
+        Ok(())
+    }
 }
 
 #[cfg(test)]