Send buffer operations via the `Project` instead of `Worktree`

Antonio Scandurra created

Change summary

crates/language/src/buffer.rs  | 25 +-------
crates/language/src/tests.rs   | 99 +++++++++++++++++++++--------------
crates/project/src/project.rs  | 38 ++++++++++++-
crates/project/src/worktree.rs | 48 -----------------
crates/server/src/rpc.rs       |  3 +
5 files changed, 101 insertions(+), 112 deletions(-)

Detailed changes

crates/language/src/buffer.rs 🔗

@@ -73,8 +73,6 @@ pub struct Buffer {
     language_server: Option<LanguageServerState>,
     completion_triggers: Vec<String>,
     deferred_ops: OperationQueue<Operation>,
-    #[cfg(test)]
-    pub(crate) operations: Vec<Operation>,
 }
 
 pub struct BufferSnapshot {
@@ -143,7 +141,7 @@ struct LanguageServerSnapshot {
     path: Arc<Path>,
 }
 
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, PartialEq, Eq)]
 pub enum Operation {
     Buffer(text::Operation),
     UpdateDiagnostics {
@@ -160,8 +158,9 @@ pub enum Operation {
     },
 }
 
-#[derive(Clone, Debug, Eq, PartialEq)]
+#[derive(Clone, Debug, PartialEq, Eq)]
 pub enum Event {
+    Operation(Operation),
     Edited,
     Dirtied,
     Saved,
@@ -202,8 +201,6 @@ pub trait File {
         cx: &mut MutableAppContext,
     ) -> Task<Result<(clock::Global, SystemTime)>>;
 
-    fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext);
-
     fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext);
 
     fn as_any(&self) -> &dyn Any;
@@ -276,8 +273,6 @@ impl File for FakeFile {
         cx.spawn(|_| async move { Ok((Default::default(), SystemTime::UNIX_EPOCH)) })
     }
 
-    fn buffer_updated(&self, _: u64, _: Operation, _: &mut MutableAppContext) {}
-
     fn buffer_removed(&self, _: u64, _: &mut MutableAppContext) {}
 
     fn as_any(&self) -> &dyn Any {
@@ -526,8 +521,6 @@ impl Buffer {
             language_server: None,
             completion_triggers: Default::default(),
             deferred_ops: OperationQueue::new(),
-            #[cfg(test)]
-            operations: Default::default(),
         }
     }
 
@@ -1745,16 +1738,8 @@ impl Buffer {
         }
     }
 
-    #[cfg(not(test))]
-    pub fn send_operation(&mut self, operation: Operation, cx: &mut ModelContext<Self>) {
-        if let Some(file) = &self.file {
-            file.buffer_updated(self.remote_id(), operation, cx.as_mut());
-        }
-    }
-
-    #[cfg(test)]
-    pub fn send_operation(&mut self, operation: Operation, _: &mut ModelContext<Self>) {
-        self.operations.push(operation);
+    fn send_operation(&mut self, operation: Operation, cx: &mut ModelContext<Self>) {
+        cx.emit(Event::Operation(operation));
     }
 
     pub fn remove_peer(&mut self, replica_id: ReplicaId, cx: &mut ModelContext<Self>) {

crates/language/src/tests.rs 🔗

@@ -76,43 +76,48 @@ fn test_edit_events(cx: &mut gpui::MutableAppContext) {
 
     let buffer1 = cx.add_model(|cx| Buffer::new(0, "abcdef", cx));
     let buffer2 = cx.add_model(|cx| Buffer::new(1, "abcdef", cx));
-    let buffer_ops = buffer1.update(cx, |buffer, cx| {
-        let buffer_1_events = buffer_1_events.clone();
-        cx.subscribe(&buffer1, move |_, _, event, _| {
-            buffer_1_events.borrow_mut().push(event.clone())
-        })
-        .detach();
-        let buffer_2_events = buffer_2_events.clone();
-        cx.subscribe(&buffer2, move |_, _, event, _| {
-            buffer_2_events.borrow_mut().push(event.clone())
-        })
-        .detach();
+    let buffer1_ops = Rc::new(RefCell::new(Vec::new()));
+    buffer1.update(cx, {
+        let buffer1_ops = buffer1_ops.clone();
+        |buffer, cx| {
+            let buffer_1_events = buffer_1_events.clone();
+            cx.become_delegate(&buffer1, move |_, _, event, _| match event {
+                Event::Operation(op) => buffer1_ops.borrow_mut().push(op),
+                event @ _ => buffer_1_events.borrow_mut().push(event),
+            })
+            .detach();
+            let buffer_2_events = buffer_2_events.clone();
+            cx.subscribe(&buffer2, move |_, _, event, _| {
+                buffer_2_events.borrow_mut().push(event.clone())
+            })
+            .detach();
 
-        // An edit emits an edited event, followed by a dirtied event,
-        // since the buffer was previously in a clean state.
-        buffer.edit(Some(2..4), "XYZ", cx);
+            // An edit emits an edited event, followed by a dirtied event,
+            // since the buffer was previously in a clean state.
+            buffer.edit(Some(2..4), "XYZ", cx);
 
-        // An empty transaction does not emit any events.
-        buffer.start_transaction();
-        buffer.end_transaction(cx);
+            // An empty transaction does not emit any events.
+            buffer.start_transaction();
+            buffer.end_transaction(cx);
 
-        // A transaction containing two edits emits one edited event.
-        now += Duration::from_secs(1);
-        buffer.start_transaction_at(now);
-        buffer.edit(Some(5..5), "u", cx);
-        buffer.edit(Some(6..6), "w", cx);
-        buffer.end_transaction_at(now, cx);
+            // A transaction containing two edits emits one edited event.
+            now += Duration::from_secs(1);
+            buffer.start_transaction_at(now);
+            buffer.edit(Some(5..5), "u", cx);
+            buffer.edit(Some(6..6), "w", cx);
+            buffer.end_transaction_at(now, cx);
 
-        // Undoing a transaction emits one edited event.
-        buffer.undo(cx);
-
-        buffer.operations.clone()
+            // Undoing a transaction emits one edited event.
+            buffer.undo(cx);
+        }
     });
 
     // Incorporating a set of remote ops emits a single edited event,
     // followed by a dirtied event.
     buffer2.update(cx, |buffer, cx| {
-        buffer.apply_ops(buffer_ops, cx).unwrap();
+        buffer
+            .apply_ops(buffer1_ops.borrow_mut().drain(..), cx)
+            .unwrap();
     });
 
     let buffer_1_events = buffer_1_events.borrow();
@@ -1177,17 +1182,26 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
         .collect::<String>();
     let mut replica_ids = Vec::new();
     let mut buffers = Vec::new();
-    let mut network = Network::new(rng.clone());
+    let network = Rc::new(RefCell::new(Network::new(rng.clone())));
 
     for i in 0..rng.gen_range(min_peers..=max_peers) {
         let buffer = cx.add_model(|cx| {
             let mut buffer = Buffer::new(i as ReplicaId, base_text.as_str(), cx);
             buffer.set_group_interval(Duration::from_millis(rng.gen_range(0..=200)));
+            let network = network.clone();
+            cx.become_delegate(&cx.handle(), move |buffer, _, event, _| {
+                if let Event::Operation(op) = event {
+                    network
+                        .borrow_mut()
+                        .broadcast(buffer.replica_id(), vec![proto::serialize_operation(&op)]);
+                }
+            })
+            .detach();
             buffer
         });
         buffers.push(buffer);
         replica_ids.push(i as ReplicaId);
-        network.add_peer(i as ReplicaId);
+        network.borrow_mut().add_peer(i as ReplicaId);
         log::info!("Adding initial peer with replica id {}", i);
     }
 
@@ -1268,10 +1282,20 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
                     let mut new_buffer =
                         Buffer::from_proto(new_replica_id, old_buffer, None, cx).unwrap();
                     new_buffer.set_group_interval(Duration::from_millis(rng.gen_range(0..=200)));
+                    let network = network.clone();
+                    cx.become_delegate(&cx.handle(), move |buffer, _, event, _| {
+                        if let Event::Operation(op) = event {
+                            network.borrow_mut().broadcast(
+                                buffer.replica_id(),
+                                vec![proto::serialize_operation(&op)],
+                            );
+                        }
+                    })
+                    .detach();
                     new_buffer
                 }));
                 replica_ids.push(new_replica_id);
-                network.replicate(replica_id, new_replica_id);
+                network.borrow_mut().replicate(replica_id, new_replica_id);
             }
             60..=69 if mutation_count != 0 => {
                 buffer.update(cx, |buffer, cx| {
@@ -1280,8 +1304,9 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
                 });
                 mutation_count -= 1;
             }
-            _ if network.has_unreceived(replica_id) => {
+            _ if network.borrow().has_unreceived(replica_id) => {
                 let ops = network
+                    .borrow_mut()
                     .receive(replica_id)
                     .into_iter()
                     .map(|op| proto::deserialize_operation(op).unwrap());
@@ -1297,14 +1322,6 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
             _ => {}
         }
 
-        buffer.update(cx, |buffer, _| {
-            let ops = buffer
-                .operations
-                .drain(..)
-                .map(|op| proto::serialize_operation(&op))
-                .collect();
-            network.broadcast(buffer.replica_id(), ops);
-        });
         now += Duration::from_millis(rng.gen_range(0..=200));
         buffers.extend(new_buffer);
 
@@ -1312,7 +1329,7 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) {
             buffer.read(cx).check_invariants();
         }
 
-        if mutation_count == 0 && network.is_idle() {
+        if mutation_count == 0 && network.borrow().is_idle() {
             break;
         }
     }

crates/project/src/project.rs 🔗

@@ -17,8 +17,8 @@ use gpui::{
 use language::{
     proto::{deserialize_anchor, deserialize_version, serialize_anchor, serialize_version},
     range_from_lsp, Anchor, AnchorRangeExt, Bias, Buffer, CodeAction, CodeLabel, Completion,
-    Diagnostic, DiagnosticEntry, File as _, Language, LanguageRegistry, Operation, PointUtf16,
-    ToLspPosition, ToOffset, ToPointUtf16, Transaction,
+    Diagnostic, DiagnosticEntry, Event as BufferEvent, File as _, Language, LanguageRegistry,
+    Operation, PointUtf16, ToLspPosition, ToOffset, ToPointUtf16, Transaction,
 };
 use lsp::{DiagnosticSeverity, DocumentHighlightKind, LanguageServer};
 use lsp_command::*;
@@ -945,10 +945,37 @@ impl Project {
                 remote_id
             ))?,
         }
-        self.assign_language_to_buffer(&buffer, worktree, cx);
+        cx.become_delegate(buffer, Self::on_buffer_event).detach();
+        self.assign_language_to_buffer(buffer, worktree, cx);
+
         Ok(())
     }
 
+    fn on_buffer_event(
+        &mut self,
+        buffer: ModelHandle<Buffer>,
+        event: BufferEvent,
+        cx: &mut ModelContext<Self>,
+    ) {
+        match event {
+            BufferEvent::Operation(operation) => {
+                if let Some(project_id) = self.remote_id() {
+                    let request = self.client.request(proto::UpdateBuffer {
+                        project_id,
+                        buffer_id: buffer.read(cx).remote_id(),
+                        operations: vec![language::proto::serialize_operation(&operation)],
+                    });
+                    cx.foreground()
+                        .spawn(async move {
+                            request.await.log_err();
+                        })
+                        .detach();
+                }
+            }
+            _ => {}
+        }
+    }
+
     fn assign_language_to_buffer(
         &mut self,
         buffer: &ModelHandle<Buffer>,
@@ -4452,7 +4479,10 @@ mod tests {
         buffer1.update(cx, |buffer, cx| {
             cx.subscribe(&buffer1, {
                 let events = events.clone();
-                move |_, _, event, _| events.borrow_mut().push(event.clone())
+                move |_, _, event, _| match event {
+                    BufferEvent::Operation(_) => {}
+                    _ => events.borrow_mut().push(event.clone()),
+                }
             })
             .detach();
 

crates/project/src/worktree.rs 🔗

@@ -19,7 +19,7 @@ use gpui::{
 };
 use language::{
     proto::{deserialize_version, serialize_version},
-    Buffer, DiagnosticEntry, Operation, PointUtf16, Rope,
+    Buffer, DiagnosticEntry, PointUtf16, Rope,
 };
 use lazy_static::lazy_static;
 use parking_lot::Mutex;
@@ -71,7 +71,6 @@ pub struct LocalWorktree {
     share: Option<ShareState>,
     diagnostics: HashMap<Arc<Path>, Vec<DiagnosticEntry<PointUtf16>>>,
     diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
-    queued_operations: Vec<(u64, Operation)>,
     client: Arc<Client>,
     fs: Arc<dyn Fs>,
     visible: bool,
@@ -84,7 +83,6 @@ pub struct RemoteWorktree {
     client: Arc<Client>,
     updates_tx: UnboundedSender<proto::UpdateWorktree>,
     replica_id: ReplicaId,
-    queued_operations: Vec<(u64, Operation)>,
     diagnostic_summaries: TreeMap<PathKey, DiagnosticSummary>,
     visible: bool,
 }
@@ -226,7 +224,6 @@ impl Worktree {
                 snapshot_rx: snapshot_rx.clone(),
                 updates_tx,
                 client: client.clone(),
-                queued_operations: Default::default(),
                 diagnostic_summaries: TreeMap::from_ordered_entries(
                     worktree.diagnostic_summaries.into_iter().map(|summary| {
                         (
@@ -420,42 +417,6 @@ impl Worktree {
 
         cx.notify();
     }
-
-    fn send_buffer_update(
-        &mut self,
-        buffer_id: u64,
-        operation: Operation,
-        cx: &mut ModelContext<Self>,
-    ) {
-        if let Some((project_id, rpc)) = match self {
-            Worktree::Local(worktree) => worktree
-                .share
-                .as_ref()
-                .map(|share| (share.project_id, worktree.client.clone())),
-            Worktree::Remote(worktree) => Some((worktree.project_id, worktree.client.clone())),
-        } {
-            cx.spawn(|worktree, mut cx| async move {
-                if let Err(error) = rpc
-                    .request(proto::UpdateBuffer {
-                        project_id,
-                        buffer_id,
-                        operations: vec![language::proto::serialize_operation(&operation)],
-                    })
-                    .await
-                {
-                    worktree.update(&mut cx, |worktree, _| {
-                        log::error!("error sending buffer operation: {}", error);
-                        match worktree {
-                            Worktree::Local(t) => &mut t.queued_operations,
-                            Worktree::Remote(t) => &mut t.queued_operations,
-                        }
-                        .push((buffer_id, operation));
-                    });
-                }
-            })
-            .detach();
-        }
-    }
 }
 
 impl LocalWorktree {
@@ -526,7 +487,6 @@ impl LocalWorktree {
                 poll_task: None,
                 diagnostics: Default::default(),
                 diagnostic_summaries: Default::default(),
-                queued_operations: Default::default(),
                 client,
                 fs,
                 visible,
@@ -1455,12 +1415,6 @@ impl language::File for File {
         })
     }
 
-    fn buffer_updated(&self, buffer_id: u64, operation: Operation, cx: &mut MutableAppContext) {
-        self.worktree.update(cx, |worktree, cx| {
-            worktree.send_buffer_update(buffer_id, operation, cx);
-        });
-    }
-
     fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) {
         self.worktree.update(cx, |worktree, _| {
             if let Worktree::Remote(worktree) = worktree {

crates/server/src/rpc.rs 🔗

@@ -1034,6 +1034,7 @@ mod tests {
     use project::{
         fs::{FakeFs, Fs as _},
         search::SearchQuery,
+        worktree::WorktreeHandle,
         DiagnosticSummary, Project, ProjectPath,
     };
     use rand::prelude::*;
@@ -1411,6 +1412,8 @@ mod tests {
         buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
         buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
 
+        worktree_a.flush_fs_events(cx_a).await;
+
         // Make changes on host's file system, see those changes on guest worktrees.
         fs.rename(
             "/a/file1".as_ref(),