WIP

Nathan Sobo and Max Brunsfeld created

Co-Authored-By: Max Brunsfeld <maxbrunsfeld@gmail.com>

Change summary

zed-rpc/proto/zed.proto |  38 +++++++++++----
zed-rpc/src/lib.rs      |   2 
zed-rpc/src/peer.rs     | 103 +++++++++++++++++++++++++++++++++++-------
zed-rpc/src/proto.rs    |  19 ++++++-
zed/src/rpc.rs          |  37 ++++++++++-----
zed/src/workspace.rs    |  64 +++++++++++++++++++++-----
zed/src/worktree.rs     |  34 ++++++++++++-
7 files changed, 235 insertions(+), 62 deletions(-)

Detailed changes

zed-rpc/proto/zed.proto 🔗

@@ -4,18 +4,19 @@ package zed.messages;
 message Envelope {
     uint32 id = 1;
     optional uint32 responding_to = 2;
+    optional uint32 original_sender_id = 3;
     oneof payload {
-        Auth auth = 3;
-        AuthResponse auth_response = 4;
-        ShareWorktree share_worktree = 5;
-        ShareWorktreeResponse share_worktree_response = 6;
-        OpenWorktree open_worktree = 7;
-        OpenWorktreeResponse open_worktree_response = 8;
-        OpenFile open_file = 9;
-        OpenFileResponse open_file_response = 10;
-        CloseFile close_file = 11;
-        OpenBuffer open_buffer = 12;
-        OpenBufferResponse open_buffer_response = 13;
+        Auth auth = 4;
+        AuthResponse auth_response = 5;
+        ShareWorktree share_worktree = 6;
+        ShareWorktreeResponse share_worktree_response = 7;
+        OpenWorktree open_worktree = 8;
+        OpenWorktreeResponse open_worktree_response = 9;
+        OpenFile open_file = 10;
+        OpenFileResponse open_file_response = 11;
+        CloseFile close_file = 12;
+        OpenBuffer open_buffer = 13;
+        OpenBufferResponse open_buffer_response = 14;
     }
 }
 
@@ -46,6 +47,15 @@ message OpenWorktreeResponse {
     Worktree worktree = 1;
 }
 
+message AddGuest {
+    uint64 worktree_id = 1;
+    User user = 2;
+}
+
+message RemoveGuest {
+    uint64 worktree_id = 1;
+}
+
 message OpenFile {
     uint64 worktree_id = 1;
     string path = 2;
@@ -69,6 +79,12 @@ message OpenBufferResponse {
     Buffer buffer = 1;
 }
 
+message User {
+    string github_login = 1;
+    string avatar_url = 2;
+    uint64 id = 3;
+}
+
 message Worktree {
     string root_name = 1;
     repeated Entry entries = 2;

zed-rpc/src/lib.rs 🔗

@@ -3,4 +3,4 @@ mod peer;
 pub mod proto;
 pub mod rest;
 
-pub use peer::{ConnectionId, Peer, TypedEnvelope};
+pub use peer::*;

zed-rpc/src/peer.rs 🔗

@@ -14,6 +14,7 @@ use std::{
     collections::{HashMap, HashSet},
     fmt,
     future::Future,
+    marker::PhantomData,
     pin::Pin,
     sync::{
         atomic::{self, AtomicU32},
@@ -27,6 +28,9 @@ type BoxedReader = Pin<Box<dyn AsyncRead + 'static + Send>>;
 #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
 pub struct ConnectionId(u32);
 
+#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
+pub struct PeerId(u32);
+
 struct Connection {
     writer: Mutex<MessageStream<BoxedWriter>>,
     reader: Mutex<MessageStream<BoxedReader>>,
@@ -38,12 +42,30 @@ type MessageHandler = Box<
     dyn Send + Sync + Fn(&mut Option<proto::Envelope>, ConnectionId) -> Option<BoxFuture<bool>>,
 >;
 
+#[derive(Clone, Copy)]
+pub struct Receipt<T> {
+    sender_id: ConnectionId,
+    message_id: u32,
+    payload_type: PhantomData<T>,
+}
+
 pub struct TypedEnvelope<T> {
-    pub id: u32,
-    pub connection_id: ConnectionId,
+    pub sender_id: ConnectionId,
+    pub original_sender_id: Option<PeerId>,
+    pub message_id: u32,
     pub payload: T,
 }
 
+impl<T: RequestMessage> TypedEnvelope<T> {
+    pub fn receipt(&self) -> Receipt<T> {
+        Receipt {
+            sender_id: self.sender_id,
+            message_id: self.message_id,
+            payload_type: PhantomData,
+        }
+    }
+}
+
 pub struct Peer {
     connections: RwLock<HashMap<ConnectionId, Arc<Connection>>>,
     connection_close_barriers: RwLock<HashMap<ConnectionId, barrier::Sender>>,
@@ -81,8 +103,9 @@ impl Peer {
                     Some(
                         async move {
                             tx.send(TypedEnvelope {
-                                id: envelope.id,
-                                connection_id,
+                                sender_id: connection_id,
+                                original_sender_id: envelope.original_sender_id.map(PeerId),
+                                message_id: envelope.id,
                                 payload: T::from_envelope(envelope).unwrap(),
                             })
                             .await
@@ -200,25 +223,45 @@ impl Peer {
     ) -> Result<TypedEnvelope<M>> {
         let connection = self.connection(connection_id).await?;
         let envelope = connection.reader.lock().await.read_message().await?;
-        let id = envelope.id;
+        let original_sender_id = envelope.original_sender_id;
+        let message_id = envelope.id;
         let payload =
             M::from_envelope(envelope).ok_or_else(|| anyhow!("unexpected message type"))?;
         Ok(TypedEnvelope {
-            id,
-            connection_id,
+            sender_id: connection_id,
+            original_sender_id: original_sender_id.map(PeerId),
+            message_id,
             payload,
         })
     }
 
     pub fn request<T: RequestMessage>(
         self: &Arc<Self>,
-        connection_id: ConnectionId,
-        req: T,
+        receiver_id: ConnectionId,
+        request: T,
+    ) -> impl Future<Output = Result<T::Response>> {
+        self.request_internal(None, receiver_id, request)
+    }
+
+    pub fn forward_request<T: RequestMessage>(
+        self: &Arc<Self>,
+        sender_id: ConnectionId,
+        receiver_id: ConnectionId,
+        request: T,
+    ) -> impl Future<Output = Result<T::Response>> {
+        self.request_internal(Some(sender_id), receiver_id, request)
+    }
+
+    pub fn request_internal<T: RequestMessage>(
+        self: &Arc<Self>,
+        original_sender_id: Option<ConnectionId>,
+        receiver_id: ConnectionId,
+        request: T,
     ) -> impl Future<Output = Result<T::Response>> {
         let this = self.clone();
         let (tx, mut rx) = oneshot::channel();
         async move {
-            let connection = this.connection(connection_id).await?;
+            let connection = this.connection(receiver_id).await?;
             let message_id = connection
                 .next_message_id
                 .fetch_add(1, atomic::Ordering::SeqCst);
@@ -231,7 +274,11 @@ impl Peer {
                 .writer
                 .lock()
                 .await
-                .write_message(&req.into_envelope(message_id, None))
+                .write_message(&request.into_envelope(
+                    message_id,
+                    None,
+                    original_sender_id.map(|id| id.0),
+                ))
                 .await?;
             let response = rx
                 .recv()
@@ -257,7 +304,7 @@ impl Peer {
                 .writer
                 .lock()
                 .await
-                .write_message(&message.into_envelope(message_id, None))
+                .write_message(&message.into_envelope(message_id, None, None))
                 .await?;
             Ok(())
         }
@@ -265,12 +312,12 @@ impl Peer {
 
     pub fn respond<T: RequestMessage>(
         self: &Arc<Self>,
-        request: TypedEnvelope<T>,
+        receipt: Receipt<T>,
         response: T::Response,
     ) -> impl Future<Output = Result<()>> {
         let this = self.clone();
         async move {
-            let connection = this.connection(request.connection_id).await?;
+            let connection = this.connection(receipt.sender_id).await?;
             let message_id = connection
                 .next_message_id
                 .fetch_add(1, atomic::Ordering::SeqCst);
@@ -278,7 +325,7 @@ impl Peer {
                 .writer
                 .lock()
                 .await
-                .write_message(&response.into_envelope(message_id, Some(request.id)))
+                .write_message(&response.into_envelope(message_id, Some(receipt.message_id), None))
                 .await?;
             Ok(())
         }
@@ -301,6 +348,12 @@ impl fmt::Display for ConnectionId {
     }
 }
 
+impl fmt::Display for PeerId {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        self.0.fmt(f)
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -396,19 +449,31 @@ mod tests {
                 async move {
                     let msg = auth_rx.recv().await.unwrap();
                     assert_eq!(msg.payload, request1);
-                    server.respond(msg, response1.clone()).await.unwrap();
+                    server
+                        .respond(msg.receipt(), response1.clone())
+                        .await
+                        .unwrap();
 
                     let msg = auth_rx.recv().await.unwrap();
                     assert_eq!(msg.payload, request2.clone());
-                    server.respond(msg, response2.clone()).await.unwrap();
+                    server
+                        .respond(msg.receipt(), response2.clone())
+                        .await
+                        .unwrap();
 
                     let msg = open_buffer_rx.recv().await.unwrap();
                     assert_eq!(msg.payload, request3.clone());
-                    server.respond(msg, response3.clone()).await.unwrap();
+                    server
+                        .respond(msg.receipt(), response3.clone())
+                        .await
+                        .unwrap();
 
                     let msg = open_buffer_rx.recv().await.unwrap();
                     assert_eq!(msg.payload, request4.clone());
-                    server.respond(msg, response4.clone()).await.unwrap();
+                    server
+                        .respond(msg.receipt(), response4.clone())
+                        .await
+                        .unwrap();
 
                     server_done_tx.send(()).await.unwrap();
                 }

zed-rpc/src/proto.rs 🔗

@@ -6,7 +6,12 @@ include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
 
 pub trait EnvelopedMessage: Sized + Send + 'static {
     const NAME: &'static str;
-    fn into_envelope(self, id: u32, responding_to: Option<u32>) -> Envelope;
+    fn into_envelope(
+        self,
+        id: u32,
+        responding_to: Option<u32>,
+        original_sender_id: Option<u32>,
+    ) -> Envelope;
     fn matches_envelope(envelope: &Envelope) -> bool;
     fn from_envelope(envelope: Envelope) -> Option<Self>;
 }
@@ -20,10 +25,16 @@ macro_rules! message {
         impl EnvelopedMessage for $name {
             const NAME: &'static str = std::stringify!($name);
 
-            fn into_envelope(self, id: u32, responding_to: Option<u32>) -> Envelope {
+            fn into_envelope(
+                self,
+                id: u32,
+                responding_to: Option<u32>,
+                original_sender_id: Option<u32>,
+            ) -> Envelope {
                 Envelope {
                     id,
                     responding_to,
+                    original_sender_id,
                     payload: Some(envelope::Payload::$name(self)),
                 }
             }
@@ -132,13 +143,13 @@ mod tests {
                 user_id: 5,
                 access_token: "the-access-token".into(),
             }
-            .into_envelope(3, None);
+            .into_envelope(3, None, None);
 
             let message2 = OpenBuffer {
                 worktree_id: 1,
                 path: "path".to_string(),
             }
-            .into_envelope(5, None);
+            .into_envelope(5, None, None);
 
             let mut message_stream = MessageStream::new(byte_stream);
             message_stream.write_message(&message1).await.unwrap();

zed/src/rpc.rs 🔗

@@ -1,14 +1,18 @@
+use crate::worktree::{FileHandle, Worktree};
+
 use super::util::SurfResultExt as _;
 use anyhow::{anyhow, Context, Result};
 use gpui::executor::Background;
-use gpui::{AsyncAppContext, Task};
+use gpui::{AsyncAppContext, ModelHandle, Task};
 use lazy_static::lazy_static;
 use postage::prelude::Stream;
 use smol::lock::Mutex;
+use std::collections::{HashMap, HashSet};
 use std::time::Duration;
 use std::{convert::TryFrom, future::Future, sync::Arc};
 use surf::Url;
 use zed_rpc::{proto::RequestMessage, rest, Peer, TypedEnvelope};
+use zed_rpc::{PeerId, Receipt};
 
 pub use zed_rpc::{proto, ConnectionId};
 
@@ -20,13 +24,14 @@ lazy_static! {
 #[derive(Clone)]
 pub struct Client {
     peer: Arc<Peer>,
-    state: Arc<Mutex<ClientState>>,
+    pub state: Arc<Mutex<ClientState>>,
 }
 
 #[derive(Default)]
-struct ClientState {
-    // TODO - allow multiple connections
+pub struct ClientState {
     connection_id: Option<ConnectionId>,
+    pub shared_worktrees: HashSet<ModelHandle<Worktree>>,
+    pub shared_files: HashMap<FileHandle, HashMap<PeerId, usize>>,
 }
 
 impl Client {
@@ -42,11 +47,11 @@ impl Client {
         H: 'static + for<'a> MessageHandler<'a, M>,
         M: proto::EnvelopedMessage,
     {
-        let peer = self.peer.clone();
-        let mut messages = smol::block_on(peer.add_message_handler::<M>());
+        let this = self.clone();
+        let mut messages = smol::block_on(this.peer.add_message_handler::<M>());
         cx.spawn(|mut cx| async move {
             while let Some(message) = messages.recv().await {
-                if let Err(err) = handler.handle(message, &peer, &mut cx).await {
+                if let Err(err) = handler.handle(message, &this, &mut cx).await {
                     log::error!("error handling message: {:?}", err);
                 }
             }
@@ -189,9 +194,17 @@ impl Client {
     pub fn request<T: RequestMessage>(
         &self,
         connection_id: ConnectionId,
-        req: T,
+        request: T,
     ) -> impl Future<Output = Result<T::Response>> {
-        self.peer.request(connection_id, req)
+        self.peer.request(connection_id, request)
+    }
+
+    pub fn respond<T: RequestMessage>(
+        &self,
+        receipt: Receipt<T>,
+        response: T::Response,
+    ) -> impl Future<Output = Result<()>> {
+        self.peer.respond(receipt, response)
     }
 }
 
@@ -201,7 +214,7 @@ pub trait MessageHandler<'a, M: proto::EnvelopedMessage> {
     fn handle(
         &self,
         message: TypedEnvelope<M>,
-        rpc: &'a Arc<Peer>,
+        rpc: &'a Client,
         cx: &'a mut gpui::AsyncAppContext,
     ) -> Self::Output;
 }
@@ -209,7 +222,7 @@ pub trait MessageHandler<'a, M: proto::EnvelopedMessage> {
 impl<'a, M, F, Fut> MessageHandler<'a, M> for F
 where
     M: proto::EnvelopedMessage,
-    F: Fn(TypedEnvelope<M>, &'a Arc<Peer>, &'a mut gpui::AsyncAppContext) -> Fut,
+    F: Fn(TypedEnvelope<M>, &'a Client, &'a mut gpui::AsyncAppContext) -> Fut,
     Fut: 'a + Future<Output = anyhow::Result<()>>,
 {
     type Output = Fut;
@@ -217,7 +230,7 @@ where
     fn handle(
         &self,
         message: TypedEnvelope<M>,
-        rpc: &'a Arc<Peer>,
+        rpc: &'a Client,
         cx: &'a mut gpui::AsyncAppContext,
     ) -> Self::Output {
         (self)(message, rpc, cx)

zed/src/workspace.rs 🔗

@@ -28,7 +28,7 @@ use std::{
     path::{Path, PathBuf},
     sync::Arc,
 };
-use zed_rpc::{proto, Peer, TypedEnvelope};
+use zed_rpc::{proto, TypedEnvelope};
 
 pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) {
     cx.add_global_action("workspace:open", open);
@@ -44,7 +44,8 @@ pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) {
     ]);
     pane::init(cx);
 
-    rpc.on_message(handle_open_buffer, cx);
+    rpc.on_message(remote::open_file, cx);
+    rpc.on_message(remote::open_buffer, cx);
 }
 
 pub struct OpenParams {
@@ -106,18 +107,57 @@ fn open_paths(params: &OpenParams, cx: &mut MutableAppContext) {
     });
 }
 
-async fn handle_open_buffer(
-    request: TypedEnvelope<proto::OpenBuffer>,
-    rpc: &Arc<Peer>,
-    cx: &mut AsyncAppContext,
-) -> anyhow::Result<()> {
-    let payload = &request.payload;
-    dbg!(&payload.path);
-    rpc.respond(request, proto::OpenBufferResponse { buffer: None })
+mod remote {
+    use super::*;
+
+    pub async fn open_file(
+        request: TypedEnvelope<proto::OpenFile>,
+        rpc: &rpc::Client,
+        cx: &mut AsyncAppContext,
+    ) -> anyhow::Result<()> {
+        let message = &request.payload;
+        let mut state = rpc.state.lock().await;
+
+        let worktree = state
+            .shared_worktrees
+            .get(&(message.worktree_id as usize))
+            .ok_or_else(|| anyhow!("worktree {} not found", message.worktree_id))?
+            .clone();
+
+        let peer_id = request
+            .original_sender_id
+            .ok_or_else(|| anyhow!("missing original sender id"))?;
+
+        let file = cx.update(|cx| worktree.file(&message.path, cx)).await?;
+
+        let file_entry = state.shared_files.entry(file);
+        if matches!(file_entry, Entry::Vacant(_)) {
+            worktree.update(cx, |worktree, cx| {});
+        }
+        *file_entry
+            .or_insert(Default::default())
+            .entry(peer_id)
+            .or_insert(0) += 1;
+
+        todo!()
+    }
+
+    pub async fn open_buffer(
+        request: TypedEnvelope<proto::OpenBuffer>,
+        rpc: &rpc::Client,
+        cx: &mut AsyncAppContext,
+    ) -> anyhow::Result<()> {
+        let payload = &request.payload;
+        dbg!(&payload.path);
+        rpc.respond(
+            request.receipt(),
+            proto::OpenBufferResponse { buffer: None },
+        )
         .await?;
 
-    dbg!(cx.read(|app| app.root_view_id(1)));
-    Ok(())
+        dbg!(cx.read(|app| app.root_view_id(1)));
+        Ok(())
+    }
 }
 
 pub trait Item: Entity + Sized {

zed/src/worktree.rs 🔗

@@ -2,6 +2,7 @@ mod char_bag;
 mod fuzzy;
 mod ignore;
 
+use self::{char_bag::CharBag, ignore::IgnoreStack};
 use crate::{
     editor::{History, Rope},
     rpc::{self, proto, ConnectionId},
@@ -25,16 +26,18 @@ use std::{
     ffi::{CStr, OsStr, OsString},
     fmt, fs,
     future::Future,
+    hash::Hash,
     io::{self, Read, Write},
     ops::Deref,
     os::unix::{ffi::OsStrExt, fs::MetadataExt},
     path::{Path, PathBuf},
-    sync::{Arc, Weak},
+    sync::{
+        atomic::{AtomicUsize, Ordering::SeqCst},
+        Arc, Weak,
+    },
     time::{Duration, SystemTime},
 };
 
-use self::{char_bag::CharBag, ignore::IgnoreStack};
-
 lazy_static! {
     static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore");
 }
@@ -132,6 +135,7 @@ pub struct LocalWorktree {
     snapshot: Snapshot,
     background_snapshot: Arc<Mutex<Snapshot>>,
     handles: Arc<Mutex<HashMap<Arc<Path>, Weak<Mutex<FileHandleState>>>>>,
+    next_handle_id: AtomicUsize,
     scan_state: (watch::Sender<ScanState>, watch::Receiver<ScanState>),
     _event_stream_handle: fsevent::Handle,
     poll_scheduled: bool,
@@ -149,6 +153,7 @@ struct FileHandleState {
     path: Arc<Path>,
     is_deleted: bool,
     mtime: SystemTime,
+    id: usize,
 }
 
 impl LocalWorktree {
@@ -174,6 +179,7 @@ impl LocalWorktree {
             snapshot,
             background_snapshot: background_snapshot.clone(),
             handles: handles.clone(),
+            next_handle_id: Default::default(),
             scan_state: watch::channel_with(ScanState::Scanning),
             _event_stream_handle: event_stream_handle,
             poll_scheduled: false,
@@ -326,6 +332,7 @@ impl LocalWorktree {
         self.rpc = Some(client.clone());
         let root_name = self.root_name.clone();
         let snapshot = self.snapshot();
+        let handle = cx.handle();
         cx.spawn(|_this, cx| async move {
             let entries = cx
                 .background_executor()
@@ -353,6 +360,8 @@ impl LocalWorktree {
                 )
                 .await?;
 
+            client.state.lock().await.shared_worktrees.insert(handle);
+
             log::info!("sharing worktree {:?}", share_response);
             Ok((share_response.worktree_id, share_response.access_token))
         })
@@ -685,6 +694,21 @@ impl FileHandle {
     }
 }
 
+impl PartialEq for FileHandle {
+    fn eq(&self, other: &Self) -> bool {
+        self.worktree == other.worktree && self.state.lock().id == other.state.lock().id
+    }
+}
+
+impl Eq for FileHandle {}
+
+impl Hash for FileHandle {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.state.lock().id.hash(state);
+        self.worktree.hash(state);
+    }
+}
+
 #[derive(Clone, Debug)]
 pub struct Entry {
     kind: EntryKind,
@@ -1420,17 +1444,21 @@ impl WorktreeHandle for ModelHandle<Worktree> {
                             .get(&path)
                             .and_then(Weak::upgrade)
                             .unwrap_or_else(|| {
+                                let id =
+                                    tree.as_local().unwrap().next_handle_id.fetch_add(1, SeqCst);
                                 let handle_state = if let Some(entry) = tree.entry_for_path(&path) {
                                     FileHandleState {
                                         path: entry.path().clone(),
                                         is_deleted: false,
                                         mtime,
+                                        id,
                                     }
                                 } else {
                                     FileHandleState {
                                         path: path.clone(),
                                         is_deleted: !tree.path_is_pending(path),
                                         mtime,
+                                        id,
                                     }
                                 };