Respond to `proto::OpenBuffer` requests

Antonio Scandurra created

Change summary

zed-rpc/proto/zed.proto  |  47 +++++++---------
zed-rpc/src/peer.rs      |   6 +
zed-rpc/src/proto.rs     |   5 -
zed/src/editor/buffer.rs |  89 +++++++++++++++++++++++++++++++
zed/src/main.rs          |   4 
zed/src/rpc.rs           |  16 +++--
zed/src/test.rs          |   6 +-
zed/src/workspace.rs     |  95 ++++++++++++++--------------------
zed/src/worktree.rs      | 117 ++++++++++++++++++++++++++++++++---------
9 files changed, 261 insertions(+), 124 deletions(-)

Detailed changes

zed-rpc/proto/zed.proto 🔗

@@ -12,11 +12,9 @@ message Envelope {
         ShareWorktreeResponse share_worktree_response = 7;
         OpenWorktree open_worktree = 8;
         OpenWorktreeResponse open_worktree_response = 9;
-        OpenFile open_file = 10;
-        OpenFileResponse open_file_response = 11;
-        CloseFile close_file = 12;
-        OpenBuffer open_buffer = 13;
-        OpenBufferResponse open_buffer_response = 14;
+        OpenBuffer open_buffer = 10;
+        OpenBufferResponse open_buffer_response = 11;
+        CloseBuffer close_buffer = 12;
     }
 }
 
@@ -57,28 +55,18 @@ message RemoveGuest {
     uint64 worktree_id = 1;
 }
 
-message OpenFile {
+message OpenBuffer {
     uint64 worktree_id = 1;
     string path = 2;
 }
 
-message OpenFileResponse {
-    uint64 id = 1;
-    uint64 mtime = 2;
-}
-
-message CloseFile {
-    uint64 worktree_id = 1;
-    uint64 id = 2;
-}
-
-message OpenBuffer {
-    uint64 worktree_id = 1;
-    uint64 id = 2;
+message OpenBufferResponse {
+    uint64 buffer_id = 1;
+    Buffer buffer = 2;
 }
 
-message OpenBufferResponse {
-    Buffer buffer = 1;
+message CloseBuffer {
+    uint64 id = 1;
 }
 
 message User {
@@ -116,15 +104,22 @@ message Operation {
         uint32 local_timestamp = 2;
         uint32 lamport_timestamp = 3;
         repeated VectorClockEntry version = 4;
+        repeated Range ranges = 5;
+        optional string new_text = 6;
     }
+}
 
-    message VectorClockEntry {
-        uint32 replica_id = 1;
-        uint32 timestamp = 2;
-    }
+message VectorClockEntry {
+    uint32 replica_id = 1;
+    uint32 timestamp = 2;
 }
 
 message Timestamp {
     uint64 seconds = 1;
     uint32 nanos = 2;
-}
+}
+
+message Range {
+    uint64 start = 1;
+    uint64 end = 2;
+}

zed-rpc/src/peer.rs 🔗

@@ -432,9 +432,10 @@ mod tests {
             };
             let request3 = proto::OpenBuffer {
                 worktree_id: 1,
-                id: 2,
+                path: "path/two".to_string(),
             };
             let response3 = proto::OpenBufferResponse {
+                buffer_id: 2,
                 buffer: Some(proto::Buffer {
                     content: "path/two content".to_string(),
                     history: vec![],
@@ -442,9 +443,10 @@ mod tests {
             };
             let request4 = proto::OpenBuffer {
                 worktree_id: 2,
-                id: 1,
+                path: "path/one".to_string(),
             };
             let response4 = proto::OpenBufferResponse {
+                buffer_id: 1,
                 buffer: Some(proto::Buffer {
                     content: "path/one content".to_string(),
                     history: vec![],

zed-rpc/src/proto.rs 🔗

@@ -71,9 +71,8 @@ macro_rules! request_message {
 request_message!(Auth, AuthResponse);
 request_message!(ShareWorktree, ShareWorktreeResponse);
 request_message!(OpenWorktree, OpenWorktreeResponse);
-request_message!(OpenFile, OpenFileResponse);
-message!(CloseFile);
 request_message!(OpenBuffer, OpenBufferResponse);
+message!(CloseBuffer);
 
 /// A stream of protobuf messages.
 pub struct MessageStream<T> {
@@ -169,7 +168,7 @@ mod tests {
 
             let message2 = OpenBuffer {
                 worktree_id: 0,
-                id: 1,
+                path: "some/path".to_string(),
             }
             .into_envelope(5, None, None);
 

zed/src/editor/buffer.rs 🔗

@@ -11,6 +11,7 @@ use seahash::SeaHasher;
 pub use selection::*;
 use similar::{ChangeTag, TextDiff};
 use tree_sitter::{InputEdit, Parser, QueryCursor};
+use zed_rpc::proto;
 
 use crate::{
     language::{Language, Tree},
@@ -493,6 +494,94 @@ impl Buffer {
         }
     }
 
+    pub fn from_proto(
+        replica_id: ReplicaId,
+        remote_buffer: proto::Buffer,
+        file: Option<File>,
+        language: Option<Arc<Language>>,
+        cx: &mut ModelContext<Self>,
+    ) -> Result<Self> {
+        let mut buffer = Buffer::build(
+            replica_id,
+            History::new(remote_buffer.content.into()),
+            file,
+            language,
+            cx,
+        );
+        let ops = remote_buffer
+            .history
+            .into_iter()
+            .filter_map(|op| op.variant)
+            .map(|op| match op {
+                proto::operation::Variant::Edit(edit) => {
+                    let mut version = time::Global::new();
+                    for entry in edit.version {
+                        version.observe(time::Local {
+                            replica_id: entry.replica_id as ReplicaId,
+                            value: entry.timestamp,
+                        });
+                    }
+                    let ranges = edit
+                        .ranges
+                        .into_iter()
+                        .map(|range| range.start as usize..range.end as usize)
+                        .collect();
+                    Operation::Edit(EditOperation {
+                        timestamp: InsertionTimestamp {
+                            replica_id: edit.replica_id as ReplicaId,
+                            local: edit.local_timestamp,
+                            lamport: edit.lamport_timestamp,
+                        },
+                        version,
+                        ranges,
+                        new_text: edit.new_text,
+                    })
+                }
+            });
+        buffer.apply_ops(ops, cx)?;
+        Ok(buffer)
+    }
+
+    pub fn to_proto(&self) -> proto::Buffer {
+        let ops = self
+            .history
+            .ops
+            .values()
+            .map(|op| {
+                let version = op
+                    .version
+                    .iter()
+                    .map(|entry| proto::VectorClockEntry {
+                        replica_id: entry.replica_id as u32,
+                        timestamp: entry.value,
+                    })
+                    .collect();
+                let ranges = op
+                    .ranges
+                    .iter()
+                    .map(|range| proto::Range {
+                        start: range.start as u64,
+                        end: range.end as u64,
+                    })
+                    .collect();
+                proto::Operation {
+                    variant: Some(proto::operation::Variant::Edit(proto::operation::Edit {
+                        replica_id: op.timestamp.replica_id as u32,
+                        local_timestamp: op.timestamp.local,
+                        lamport_timestamp: op.timestamp.lamport,
+                        version,
+                        ranges,
+                        new_text: op.new_text.clone(),
+                    })),
+                }
+            })
+            .collect();
+        proto::Buffer {
+            content: self.history.base_text.to_string(),
+            history: ops,
+        }
+    }
+
     pub fn file(&self) -> Option<&File> {
         self.file.as_ref()
     }

zed/src/main.rs 🔗

@@ -21,9 +21,9 @@ fn main() {
     language_registry.set_theme(&settings.borrow().theme);
 
     let app_state = AppState {
-        language_registry,
+        language_registry: language_registry.clone(),
         settings,
-        rpc: rpc::Client::new(),
+        rpc: rpc::Client::new(language_registry),
     };
 
     app.run(move |cx| {

zed/src/rpc.rs 🔗

@@ -1,6 +1,5 @@
-use crate::worktree::{File, Worktree};
-
 use super::util::SurfResultExt as _;
+use crate::{editor::Buffer, language::LanguageRegistry, worktree::Worktree};
 use anyhow::{anyhow, Context, Result};
 use gpui::executor::Background;
 use gpui::{AsyncAppContext, ModelHandle, Task};
@@ -29,18 +28,23 @@ pub struct Client {
     pub state: Arc<Mutex<ClientState>>,
 }
 
-#[derive(Default)]
 pub struct ClientState {
     connection_id: Option<ConnectionId>,
     pub shared_worktrees: HashMap<u64, ModelHandle<Worktree>>,
-    pub shared_files: HashMap<File, HashMap<PeerId, usize>>,
+    pub shared_buffers: HashMap<PeerId, HashMap<usize, ModelHandle<Buffer>>>,
+    pub language_registry: Arc<LanguageRegistry>,
 }
 
 impl Client {
-    pub fn new() -> Self {
+    pub fn new(language_registry: Arc<LanguageRegistry>) -> Self {
         Self {
             peer: Peer::new(),
-            state: Default::default(),
+            state: Arc::new(Mutex::new(ClientState {
+                connection_id: None,
+                shared_worktrees: Default::default(),
+                shared_buffers: Default::default(),
+                language_registry,
+            })),
         }
     }
 

zed/src/test.rs 🔗

@@ -1,4 +1,4 @@
-use crate::{AppState, language::LanguageRegistry, rpc, settings, time::ReplicaId};
+use crate::{language::LanguageRegistry, rpc, settings, time::ReplicaId, AppState};
 use ctor::ctor;
 use gpui::AppContext;
 use rand::Rng;
@@ -149,7 +149,7 @@ pub fn build_app_state(cx: &AppContext) -> AppState {
     let language_registry = Arc::new(LanguageRegistry::new());
     AppState {
         settings,
-        language_registry,
-        rpc: rpc::Client::new(),
+        language_registry: language_registry.clone(),
+        rpc: rpc::Client::new(language_registry),
     }
 }

zed/src/workspace.rs 🔗

@@ -44,9 +44,8 @@ pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) {
     ]);
     pane::init(cx);
 
-    rpc.on_message(remote::open_file, cx);
-    rpc.on_message(remote::close_file, cx);
     rpc.on_message(remote::open_buffer, cx);
+    rpc.on_message(remote::close_buffer, cx);
 }
 
 pub struct OpenParams {
@@ -110,41 +109,52 @@ fn open_paths(params: &OpenParams, cx: &mut MutableAppContext) {
 mod remote {
     use super::*;
 
-    pub async fn open_file(
-        request: TypedEnvelope<proto::OpenFile>,
+    pub async fn open_buffer(
+        request: TypedEnvelope<proto::OpenBuffer>,
         rpc: &rpc::Client,
         cx: &mut AsyncAppContext,
     ) -> anyhow::Result<()> {
-        // let message = &request.payload;
-        // let peer_id = request
-        //     .original_sender_id
-        //     .ok_or_else(|| anyhow!("missing original sender id"))?;
-
-        // let mut state = rpc.state.lock().await;
-        // let worktree = state
-        //     .shared_worktrees
-        //     .get(&message.worktree_id)
-        //     .ok_or_else(|| anyhow!("worktree {} not found", message.worktree_id))?
-        //     .clone();
-
-        // let file = worktree.file(&message.path);
-        // let id = file.id() as u64;
-        // let mtime = file.mtime().as_secs();
+        let message = &request.payload;
+        let peer_id = request
+            .original_sender_id
+            .ok_or_else(|| anyhow!("missing original sender id"))?;
 
-        // *state
-        //     .shared_files
-        //     .entry(file)
-        //     .or_insert(Default::default())
-        //     .entry(peer_id)
-        //     .or_insert(0) += 1;
+        let mut state = rpc.state.lock().await;
+        let worktree = state
+            .shared_worktrees
+            .get(&message.worktree_id)
+            .ok_or_else(|| anyhow!("worktree {} not found", message.worktree_id))?
+            .clone();
+
+        let buffer = worktree
+            .update(cx, |worktree, cx| {
+                worktree.open_buffer(
+                    Path::new(&message.path),
+                    state.language_registry.clone(),
+                    cx,
+                )
+            })
+            .await?;
+        state
+            .shared_buffers
+            .entry(peer_id)
+            .or_default()
+            .insert(buffer.id(), buffer.clone());
+
+        rpc.respond(
+            request.receipt(),
+            proto::OpenBufferResponse {
+                buffer_id: buffer.id() as u64,
+                buffer: Some(buffer.read_with(cx, |buf, _| buf.to_proto())),
+            },
+        )
+        .await?;
 
-        // rpc.respond(request.receipt(), proto::OpenFileResponse { id, mtime })
-        //     .await?;
         Ok(())
     }
 
-    pub async fn close_file(
-        request: TypedEnvelope<proto::CloseFile>,
+    pub async fn close_buffer(
+        request: TypedEnvelope<proto::CloseBuffer>,
         rpc: &rpc::Client,
         _: &mut AsyncAppContext,
     ) -> anyhow::Result<()> {
@@ -169,33 +179,6 @@ mod remote {
 
         Ok(())
     }
-
-    pub async fn open_buffer(
-        request: TypedEnvelope<proto::OpenBuffer>,
-        rpc: &rpc::Client,
-        cx: &mut AsyncAppContext,
-    ) -> anyhow::Result<()> {
-        // let message = &request.payload;
-        // let handle = {
-        //     let state = rpc.state.lock().await;
-        //     let mut files = state.shared_files.keys();
-        //     files.find(|file| file.id() == message.id).cloned()
-        // };
-        // let buffer = if let Some(handle) = handle {
-        //     let history = cx.read(|cx| handle.load_history(cx)).await?;
-        //     Some(proto::Buffer {
-        //         content: history.base_text.to_string(),
-        //         history: Vec::new(),
-        //     })
-        // } else {
-        //     None
-        // };
-
-        // rpc.respond(request.receipt(), proto::OpenBufferResponse { buffer })
-        //     .await?;
-
-        Ok(())
-    }
 }
 
 pub trait Item: Entity + Sized {

zed/src/worktree.rs 🔗

@@ -65,7 +65,7 @@ impl Worktree {
     }
 
     pub fn remote(
-        id: usize,
+        id: u64,
         worktree: proto::Worktree,
         rpc: rpc::Client,
         connection_id: ConnectionId,
@@ -98,6 +98,14 @@ impl Worktree {
         }
     }
 
+    pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> {
+        if let Worktree::Remote(worktree) = self {
+            Some(worktree)
+        } else {
+            None
+        }
+    }
+
     pub fn snapshot(&self) -> Snapshot {
         match self {
             Worktree::Local(worktree) => worktree.snapshot(),
@@ -110,10 +118,12 @@ impl Worktree {
         path: impl AsRef<Path>,
         language_registry: Arc<LanguageRegistry>,
         cx: &mut ModelContext<Self>,
-    ) -> impl Future<Output = Result<ModelHandle<Buffer>>> + 'static {
+    ) -> Task<Result<ModelHandle<Buffer>>> {
         match self {
             Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), language_registry, cx),
-            Worktree::Remote(_) => todo!(),
+            Worktree::Remote(worktree) => {
+                worktree.open_buffer(path.as_ref(), language_registry, cx)
+            }
         }
     }
 
@@ -148,7 +158,7 @@ pub struct LocalWorktree {
     _event_stream_handle: fsevent::Handle,
     poll_scheduled: bool,
     rpc: Option<rpc::Client>,
-    open_buffers: HashSet<WeakModelHandle<Buffer>>,
+    open_buffers: HashMap<usize, WeakModelHandle<Buffer>>,
 }
 
 impl LocalWorktree {
@@ -220,12 +230,12 @@ impl LocalWorktree {
         path: &Path,
         language_registry: Arc<LanguageRegistry>,
         cx: &mut ModelContext<Worktree>,
-    ) -> impl Future<Output = Result<ModelHandle<Buffer>>> + 'static {
+    ) -> Task<Result<ModelHandle<Buffer>>> {
         let handle = cx.handle();
 
         // If there is already a buffer for the given path, then return it.
         let mut existing_buffer = None;
-        self.open_buffers.retain(|buffer| {
+        self.open_buffers.retain(|_buffer_id, buffer| {
             if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
                 if let Some(file) = buffer.read(cx.as_ref()).file() {
                     if file.worktree_id() == handle.id() && file.path.as_ref() == path {
@@ -238,12 +248,16 @@ impl LocalWorktree {
             }
         });
 
-        let mut new_buffer = None;
-        if existing_buffer.is_none() {
-            let path = Arc::from(path);
-            let contents = self.load(&path, cx.as_ref());
-            new_buffer = Some(cx.spawn(|this, mut cx| async move {
-                let contents = contents.await?;
+        let path = Arc::from(path);
+        cx.spawn(|this, mut cx| async move {
+            if let Some(existing_buffer) = existing_buffer {
+                Ok(existing_buffer)
+            } else {
+                let contents = this
+                    .read_with(&cx, |this, cx| {
+                        this.as_local().unwrap().load(&path, cx.as_ref())
+                    })
+                    .await?;
                 let language = language_registry.select_language(&path).cloned();
                 let file = File::new(handle, path.into());
                 let buffer = cx.add_model(|cx| {
@@ -251,19 +265,11 @@ impl LocalWorktree {
                 });
                 this.update(&mut cx, |this, _| {
                     let this = this.as_local_mut().unwrap();
-                    this.open_buffers.insert(buffer.downgrade());
+                    this.open_buffers.insert(buffer.id(), buffer.downgrade());
                 });
                 Ok(buffer)
-            }));
-        }
-
-        async move {
-            if let Some(existing_buffer) = existing_buffer {
-                Ok(existing_buffer)
-            } else {
-                new_buffer.unwrap().await
             }
-        }
+        })
     }
 
     pub fn scan_complete(&self) -> impl Future<Output = ()> {
@@ -292,7 +298,7 @@ impl LocalWorktree {
 
     fn observe_snapshot_diff(&mut self, diff: Diff, cx: &mut ModelContext<Worktree>) {
         let handle = cx.handle();
-        self.open_buffers.retain(|buffer| {
+        self.open_buffers.retain(|_buffer_id, buffer| {
             if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
                 buffer.update(cx, |buffer, cx| {
                     let handle = handle.clone();
@@ -506,16 +512,17 @@ impl fmt::Debug for LocalWorktree {
 }
 
 pub struct RemoteWorktree {
-    remote_id: usize,
+    remote_id: u64,
     snapshot: Snapshot,
     rpc: rpc::Client,
     connection_id: ConnectionId,
     replica_id: ReplicaId,
+    open_buffers: HashMap<u64, WeakModelHandle<Buffer>>,
 }
 
 impl RemoteWorktree {
     fn new(
-        remote_id: usize,
+        remote_id: u64,
         worktree: proto::Worktree,
         rpc: rpc::Client,
         connection_id: ConnectionId,
@@ -568,8 +575,64 @@ impl RemoteWorktree {
             rpc,
             connection_id,
             replica_id,
+            open_buffers: Default::default(),
         }
     }
+
+    pub fn open_buffer(
+        &mut self,
+        path: &Path,
+        language_registry: Arc<LanguageRegistry>,
+        cx: &mut ModelContext<Worktree>,
+    ) -> Task<Result<ModelHandle<Buffer>>> {
+        let handle = cx.handle();
+        let mut existing_buffer = None;
+        self.open_buffers.retain(|_buffer_id, buffer| {
+            if let Some(buffer) = buffer.upgrade(cx.as_ref()) {
+                if let Some(file) = buffer.read(cx.as_ref()).file() {
+                    if file.worktree_id() == handle.id() && file.path.as_ref() == path {
+                        existing_buffer = Some(buffer);
+                    }
+                }
+                true
+            } else {
+                false
+            }
+        });
+
+        let rpc = self.rpc.clone();
+        let replica_id = self.replica_id;
+        let connection_id = self.connection_id;
+        let remote_worktree_id = self.remote_id;
+        let path = path.to_string_lossy().to_string();
+        cx.spawn(|this, mut cx| async move {
+            if let Some(existing_buffer) = existing_buffer {
+                Ok(existing_buffer)
+            } else {
+                let file = File::new(handle, Path::new(&path).into());
+                let language = language_registry.select_language(&path).cloned();
+                let response = rpc
+                    .request(
+                        connection_id,
+                        proto::OpenBuffer {
+                            worktree_id: remote_worktree_id as u64,
+                            path,
+                        },
+                    )
+                    .await?;
+                let buffer_id = response.buffer_id;
+                let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?;
+                let buffer = cx.add_model(|cx| {
+                    Buffer::from_proto(replica_id, remote_buffer, Some(file), language, cx).unwrap()
+                });
+                this.update(&mut cx, |this, _| {
+                    let this = this.as_remote_mut().unwrap();
+                    this.open_buffers.insert(buffer_id, buffer.downgrade());
+                });
+                Ok(buffer)
+            }
+        })
+    }
 }
 
 #[derive(Clone)]
@@ -833,7 +896,9 @@ impl File {
     pub fn saved_buffer(&self, buffer: ModelHandle<Buffer>, cx: &mut MutableAppContext) {
         self.worktree.update(cx, |worktree, _| {
             if let Worktree::Local(worktree) = worktree {
-                worktree.open_buffers.insert(buffer.downgrade());
+                worktree
+                    .open_buffers
+                    .insert(buffer.id(), buffer.downgrade());
             }
         })
     }