Maintain remote buffers via `UpdateBufferFile` messages sent by host

Antonio Scandurra created

Change summary

crates/language/src/buffer.rs | 14 +++++--
crates/project/src/project.rs | 65 ++++++++++++++++++++++++++++++------
crates/rpc/src/proto.rs       |  1 
crates/server/src/rpc.rs      | 58 +++++++++++++++++++++++++++++---
4 files changed, 115 insertions(+), 23 deletions(-)

Detailed changes

crates/language/src/buffer.rs 🔗

@@ -668,10 +668,14 @@ impl Buffer {
         &mut self,
         new_file: Box<dyn File>,
         cx: &mut ModelContext<Self>,
-    ) -> Option<Task<()>> {
-        let old_file = self.file.as_ref()?;
+    ) -> Task<()> {
+        let old_file = if let Some(file) = self.file.as_ref() {
+            file
+        } else {
+            return Task::ready(());
+        };
         let mut file_changed = false;
-        let mut task = None;
+        let mut task = Task::ready(());
 
         if new_file.path() != old_file.path() {
             file_changed = true;
@@ -690,7 +694,7 @@ impl Buffer {
                 file_changed = true;
 
                 if !self.is_dirty() {
-                    task = Some(cx.spawn(|this, mut cx| {
+                    task = cx.spawn(|this, mut cx| {
                         async move {
                             let new_text = this.read_with(&cx, |this, cx| {
                                 this.file
@@ -714,7 +718,7 @@ impl Buffer {
                         }
                         .log_err()
                         .map(drop)
-                    }));
+                    });
                 }
             }
         }

crates/project/src/project.rs 🔗

@@ -298,6 +298,7 @@ impl Project {
                         Self::handle_disk_based_diagnostics_updated,
                     ),
                     client.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer),
+                    client.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer_file),
                     client.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved),
                 ],
                 client,
@@ -1136,10 +1137,12 @@ impl Project {
 
     fn add_worktree(&mut self, worktree: &ModelHandle<Worktree>, cx: &mut ModelContext<Self>) {
         cx.observe(&worktree, |_, _, cx| cx.notify()).detach();
-        cx.subscribe(&worktree, |this, worktree, _, cx| {
-            this.update_open_buffers(worktree, cx)
-        })
-        .detach();
+        if worktree.read(cx).is_local() {
+            cx.subscribe(&worktree, |this, worktree, _, cx| {
+                this.update_local_worktree_buffers(worktree, cx);
+            })
+            .detach();
+        }
 
         let push_weak_handle = {
             let worktree = worktree.read(cx);
@@ -1161,12 +1164,11 @@ impl Project {
         cx.notify();
     }
 
-    fn update_open_buffers(
+    fn update_local_worktree_buffers(
         &mut self,
         worktree_handle: ModelHandle<Worktree>,
         cx: &mut ModelContext<Self>,
     ) {
-        let local = worktree_handle.read(cx).is_local();
         let snapshot = worktree_handle.read(cx).snapshot();
         let mut buffers_to_delete = Vec::new();
         for (buffer_id, buffer) in &self.open_buffers {
@@ -1183,7 +1185,7 @@ impl Project {
                                 .and_then(|entry_id| snapshot.entry_for_id(entry_id))
                             {
                                 File {
-                                    is_local: local,
+                                    is_local: true,
                                     entry_id: Some(entry.id),
                                     mtime: entry.mtime,
                                     path: entry.path.clone(),
@@ -1193,7 +1195,7 @@ impl Project {
                                 snapshot.entry_for_path(old_file.path().as_ref())
                             {
                                 File {
-                                    is_local: local,
+                                    is_local: true,
                                     entry_id: Some(entry.id),
                                     mtime: entry.mtime,
                                     path: entry.path.clone(),
@@ -1201,7 +1203,7 @@ impl Project {
                                 }
                             } else {
                                 File {
-                                    is_local: local,
+                                    is_local: true,
                                     entry_id: None,
                                     path: old_file.path().clone(),
                                     mtime: old_file.mtime(),
@@ -1209,9 +1211,18 @@ impl Project {
                                 }
                             };
 
-                            if let Some(task) = buffer.file_updated(Box::new(new_file), cx) {
-                                task.detach();
+                            if let Some(project_id) = self.remote_id() {
+                                let client = self.client.clone();
+                                let message = proto::UpdateBufferFile {
+                                    project_id,
+                                    buffer_id: *buffer_id as u64,
+                                    file: Some(new_file.to_proto()),
+                                };
+                                cx.foreground()
+                                    .spawn(async move { client.send(message).await })
+                                    .detach_and_log_err(cx);
                             }
+                            buffer.file_updated(Box::new(new_file), cx).detach();
                         }
                     });
                 } else {
@@ -1492,6 +1503,31 @@ impl Project {
         Ok(())
     }
 
+    pub fn handle_update_buffer_file(
+        &mut self,
+        envelope: TypedEnvelope<proto::UpdateBufferFile>,
+        _: Arc<Client>,
+        cx: &mut ModelContext<Self>,
+    ) -> Result<()> {
+        let payload = envelope.payload.clone();
+        let buffer_id = payload.buffer_id as usize;
+        let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?;
+        let worktree = self
+            .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx)
+            .ok_or_else(|| anyhow!("no such worktree"))?;
+        let file = File::from_proto(file, worktree.clone(), cx)?;
+        let buffer = self
+            .open_buffers
+            .get_mut(&buffer_id)
+            .and_then(|b| b.upgrade(cx))
+            .ok_or_else(|| anyhow!("no such buffer"))?;
+        buffer.update(cx, |buffer, cx| {
+            buffer.file_updated(Box::new(file), cx).detach();
+        });
+
+        Ok(())
+    }
+
     pub fn handle_save_buffer(
         &mut self,
         envelope: TypedEnvelope<proto::SaveBuffer>,
@@ -2181,7 +2217,12 @@ mod tests {
         cx.update(|cx| {
             let target_buffer = definition.target_buffer.read(cx);
             assert_eq!(
-                target_buffer.file().unwrap().as_local().unwrap().abs_path(cx),
+                target_buffer
+                    .file()
+                    .unwrap()
+                    .as_local()
+                    .unwrap()
+                    .abs_path(cx),
                 dir.path().join("a.rs")
             );
             assert_eq!(definition.target_range.to_offset(target_buffer), 9..10);

crates/rpc/src/proto.rs 🔗

@@ -198,6 +198,7 @@ entity_messages!(
     UnregisterWorktree,
     UnshareProject,
     UpdateBuffer,
+    UpdateBufferFile,
     UpdateDiagnosticSummary,
     UpdateWorktree,
 );

crates/server/src/rpc.rs 🔗

@@ -77,6 +77,7 @@ impl Server {
             .add_handler(Server::open_buffer)
             .add_handler(Server::close_buffer)
             .add_handler(Server::update_buffer)
+            .add_handler(Server::update_buffer_file)
             .add_handler(Server::buffer_saved)
             .add_handler(Server::save_buffer)
             .add_handler(Server::format_buffer)
@@ -704,6 +705,22 @@ impl Server {
         Ok(())
     }
 
+    async fn update_buffer_file(
+        self: Arc<Server>,
+        request: TypedEnvelope<proto::UpdateBufferFile>,
+    ) -> tide::Result<()> {
+        let receiver_ids = self
+            .state()
+            .project_connection_ids(request.payload.project_id, request.sender_id)
+            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
+        broadcast(request.sender_id, receiver_ids, |connection_id| {
+            self.peer
+                .forward_send(request.sender_id, connection_id, request.payload.clone())
+        })
+        .await?;
+        Ok(())
+    }
+
     async fn buffer_saved(
         self: Arc<Server>,
         request: TypedEnvelope<proto::BufferSaved>,
@@ -1470,9 +1487,7 @@ mod tests {
             .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
             .await;
         buffer_b
-            .condition(&mut cx_b, |buf, _| {
-                dbg!(buf.text()) == "i-am-c, i-am-b, i-am-a"
-            })
+            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
             .await;
         buffer_c
             .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
@@ -1490,7 +1505,10 @@ mod tests {
         buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
         buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
 
-        // Make changes on host's file system, see those changes on the guests.
+        // Make changes on host's file system, see those changes on guest worktrees.
+        fs.rename("/a/file1".as_ref(), "/a/file1-renamed".as_ref())
+            .await
+            .unwrap();
         fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
             .await
             .unwrap();
@@ -1498,18 +1516,29 @@ mod tests {
             .await
             .unwrap();
 
+        worktree_a
+            .condition(&cx_a, |tree, _| tree.file_count() == 4)
+            .await;
         worktree_b
             .condition(&cx_b, |tree, _| tree.file_count() == 4)
             .await;
         worktree_c
             .condition(&cx_c, |tree, _| tree.file_count() == 4)
             .await;
+        worktree_a.read_with(&cx_a, |tree, _| {
+            assert_eq!(
+                tree.paths()
+                    .map(|p| p.to_string_lossy())
+                    .collect::<Vec<_>>(),
+                &[".zed.toml", "file1-renamed", "file3", "file4"]
+            )
+        });
         worktree_b.read_with(&cx_b, |tree, _| {
             assert_eq!(
                 tree.paths()
                     .map(|p| p.to_string_lossy())
                     .collect::<Vec<_>>(),
-                &[".zed.toml", "file1", "file3", "file4"]
+                &[".zed.toml", "file1-renamed", "file3", "file4"]
             )
         });
         worktree_c.read_with(&cx_c, |tree, _| {
@@ -1517,9 +1546,26 @@ mod tests {
                 tree.paths()
                     .map(|p| p.to_string_lossy())
                     .collect::<Vec<_>>(),
-                &[".zed.toml", "file1", "file3", "file4"]
+                &[".zed.toml", "file1-renamed", "file3", "file4"]
             )
         });
+
+        // Ensure buffer files are updated as well.
+        buffer_a
+            .condition(&cx_a, |buf, _| {
+                buf.file().unwrap().path().to_str() == Some("file1-renamed")
+            })
+            .await;
+        buffer_b
+            .condition(&cx_b, |buf, _| {
+                buf.file().unwrap().path().to_str() == Some("file1-renamed")
+            })
+            .await;
+        buffer_c
+            .condition(&cx_c, |buf, _| {
+                buf.file().unwrap().path().to_str() == Some("file1-renamed")
+            })
+            .await;
     }
 
     #[gpui::test]