WIP: Save remote buffers

Antonio Scandurra created

Change summary

zed-rpc/proto/zed.proto  |  17 ++++
zed-rpc/src/proto.rs     |   1 
zed/src/editor.rs        |   6 +
zed/src/editor/buffer.rs |  14 +--
zed/src/worktree.rs      | 130 ++++++++++++++++++++++++++++++++++++-----
5 files changed, 140 insertions(+), 28 deletions(-)

Detailed changes

zed-rpc/proto/zed.proto 🔗

@@ -17,8 +17,10 @@ message Envelope {
         OpenBufferResponse open_buffer_response = 12;
         CloseBuffer close_buffer = 13;
         UpdateBuffer update_buffer = 14;
-        AddGuest add_guest = 15;
-        RemoveGuest remove_guest = 16;
+        SaveBuffer save_buffer = 15;
+        BufferSaved buffer_saved = 16;
+        AddGuest add_guest = 17;
+        RemoveGuest remove_guest = 18;
     }
 }
 
@@ -90,6 +92,17 @@ message UpdateBuffer {
     repeated Operation operations = 3;
 }
 
+message SaveBuffer {
+    uint64 worktree_id = 1;
+    uint64 buffer_id = 2;
+}
+
+message BufferSaved {
+    uint64 worktree_id = 1;
+    uint64 buffer_id = 2;
+    repeated VectorClockEntry version = 3;
+}
+
 message User {
     uint64 id = 1;
     string github_login = 2;

zed-rpc/src/proto.rs 🔗

@@ -75,6 +75,7 @@ message!(CloseWorktree);
 request_message!(OpenBuffer, OpenBufferResponse);
 message!(CloseBuffer);
 message!(UpdateBuffer);
+request_message!(SaveBuffer, BufferSaved);
 message!(AddGuest);
 message!(RemoveGuest);
 

zed/src/editor.rs 🔗

@@ -2538,7 +2538,11 @@ impl workspace::ItemView for Editor {
     }
 
     fn save(&mut self, cx: &mut ViewContext<Self>) -> Result<Task<Result<()>>> {
-        self.buffer.update(cx, |b, cx| b.save(cx))
+        let save = self.buffer.update(cx, |b, cx| b.save(cx))?;
+        Ok(cx.spawn(|_, _| async move {
+            save.await?;
+            Ok(())
+        }))
     }
 
     fn save_as(

zed/src/editor/buffer.rs 🔗

@@ -666,22 +666,20 @@ impl Buffer {
         self.file.as_mut()
     }
 
-    pub fn save(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<()>>> {
+    pub fn save(&mut self, cx: &mut ModelContext<Self>) -> Result<Task<Result<time::Global>>> {
         let file = self
             .file
             .as_ref()
             .ok_or_else(|| anyhow!("buffer has no file"))?;
-
         let text = self.visible_text.clone();
         let version = self.version.clone();
-        let save = file.save(text, cx.as_mut());
-
+        let save = file.save(self.remote_id, text, version, cx.as_mut());
         Ok(cx.spawn(|this, mut cx| async move {
-            save.await?;
+            let version = save.await?;
             this.update(&mut cx, |this, cx| {
-                this.did_save(version, cx).unwrap();
+                this.did_save(version.clone(), cx).unwrap();
             });
-            Ok(())
+            Ok(version)
         }))
     }
 
@@ -711,7 +709,7 @@ impl Buffer {
         })
     }
 
-    fn did_save(&mut self, version: time::Global, cx: &mut ModelContext<Self>) -> Result<()> {
+    pub fn did_save(&mut self, version: time::Global, cx: &mut ModelContext<Self>) -> Result<()> {
         if let Some(file) = self.file.as_ref() {
             self.saved_mtime = file.mtime;
             self.saved_version = version;

zed/src/worktree.rs 🔗

@@ -8,7 +8,7 @@ use crate::{
     language::LanguageRegistry,
     rpc::{self, proto},
     sum_tree::{self, Cursor, Edit, SumTree},
-    time::ReplicaId,
+    time::{self, ReplicaId},
     util::Bias,
 };
 use ::ignore::gitignore::Gitignore;
@@ -58,6 +58,7 @@ pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) {
     rpc.on_message(remote::open_buffer, cx);
     rpc.on_message(remote::close_buffer, cx);
     rpc.on_message(remote::update_buffer, cx);
+    rpc.on_message(remote::buffer_saved, cx);
 }
 
 #[derive(Clone, Debug)]
@@ -274,21 +275,28 @@ impl Worktree {
         Ok(())
     }
 
-    fn save(
-        &self,
-        path: &Path,
-        text: Rope,
+    pub fn buffer_saved(
+        &mut self,
+        message: proto::BufferSaved,
         cx: &mut ModelContext<Self>,
-    ) -> impl Future<Output = Result<()>> {
-        match self {
-            Worktree::Local(worktree) => {
-                let save = worktree.save(path, text, cx);
-                async move {
-                    save.await?;
-                    Ok(())
-                }
+    ) -> Result<()> {
+        if let Worktree::Remote(worktree) = self {
+            if let Some(buffer) = worktree
+                .open_buffers
+                .get(&(message.buffer_id as usize))
+                .and_then(|buf| buf.upgrade(&cx))
+            {
+                buffer.update(cx, |buffer, cx| {
+                    buffer.did_save(message.version.try_into()?, cx);
+                    Result::<_, anyhow::Error>::Ok(())
+                })?;
             }
-            Worktree::Remote(_) => todo!(),
+            Ok(())
+        } else {
+            Err(anyhow!(
+                "invalid buffer {} in buffer saved message",
+                message.buffer_id
+            ))
         }
     }
 }
@@ -649,6 +657,20 @@ impl LocalWorktree {
         })
     }
 
+    pub fn save_remote_buffer(
+        &self,
+        envelope: &TypedEnvelope<proto::SaveBuffer>,
+        cx: &mut ModelContext<Worktree>,
+    ) -> Result<Task<Result<time::Global>>> {
+        let sender_id = envelope.original_sender_id()?;
+        let buffer = self
+            .shared_buffers
+            .get_mut(&sender_id)
+            .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id))
+            .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?;
+        buffer.update(cx, |buffer, cx| buffer.save(cx))
+    }
+
     pub fn save_buffer_as(
         &self,
         buffer: ModelHandle<Buffer>,
@@ -1248,9 +1270,44 @@ impl File {
         !self.is_deleted()
     }
 
-    pub fn save(&self, text: Rope, cx: &mut MutableAppContext) -> impl Future<Output = Result<()>> {
-        self.worktree
-            .update(cx, |worktree, cx| worktree.save(&self.path, text, cx))
+    pub fn save(
+        &self,
+        buffer_id: u64,
+        text: Rope,
+        version: time::Global,
+        cx: &mut MutableAppContext,
+    ) -> Task<Result<time::Global>> {
+        self.worktree.update(cx, |worktree, cx| match worktree {
+            Worktree::Local(worktree) => {
+                let rpc = worktree.rpc.clone();
+                let save = worktree.save(self.path.clone(), text, cx);
+                cx.spawn(|_, _| async move {
+                    save.await?;
+                    if let Some((rpc, worktree_id)) = rpc {
+                        rpc.send(proto::BufferSaved {
+                            worktree_id,
+                            buffer_id,
+                            version: (&version).into(),
+                        })
+                        .await?;
+                    }
+                    Ok(version)
+                })
+            }
+            Worktree::Remote(worktree) => {
+                let rpc = worktree.rpc.clone();
+                let worktree_id = worktree.remote_id;
+                cx.spawn(|_, _| async move {
+                    let response = rpc
+                        .request(proto::SaveBuffer {
+                            worktree_id,
+                            buffer_id,
+                        })
+                        .await?;
+                    Ok(response.version.try_into()?)
+                })
+            }
+        })
     }
 
     pub fn worktree_id(&self) -> usize {
@@ -2138,6 +2195,45 @@ mod remote {
         worktree.update(cx, |tree, cx| tree.update_buffer(message, cx))?;
         Ok(())
     }
+
+    pub async fn save_buffer(
+        envelope: TypedEnvelope<proto::SaveBuffer>,
+        rpc: &rpc::Client,
+        cx: &mut AsyncAppContext,
+    ) -> anyhow::Result<()> {
+        let mut state = rpc.state.lock().await;
+        let worktree = state.shared_worktree(envelope.payload.worktree_id, cx)?;
+        let version = worktree
+            .update(cx, |tree, cx| {
+                tree.as_local_mut()
+                    .unwrap()
+                    .save_remote_buffer(&envelope, cx)
+            })?
+            .await?;
+        rpc.respond(
+            envelope.receipt(),
+            proto::BufferSaved {
+                worktree_id: envelope.payload.worktree_id,
+                buffer_id: envelope.payload.buffer_id,
+                version: (&version).into(),
+            },
+        )
+        .await?;
+        Ok(())
+    }
+
+    pub async fn buffer_saved(
+        envelope: TypedEnvelope<proto::BufferSaved>,
+        rpc: &rpc::Client,
+        cx: &mut AsyncAppContext,
+    ) -> anyhow::Result<()> {
+        let mut state = rpc.state.lock().await;
+        let worktree = state.shared_worktree(envelope.payload.worktree_id, cx)?;
+        worktree.update(cx, |worktree, cx| {
+            worktree.buffer_saved(envelope.payload, cx)
+        })?;
+        Ok(())
+    }
 }
 
 #[cfg(test)]