From 0deaa3a61de0c9137d09c0dc5fc37e8b6839da85 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Fri, 18 Jun 2021 17:26:12 -0600 Subject: [PATCH] WIP Co-Authored-By: Max Brunsfeld --- zed-rpc/proto/zed.proto | 38 ++++++++++----- zed-rpc/src/lib.rs | 2 +- zed-rpc/src/peer.rs | 103 ++++++++++++++++++++++++++++++++-------- zed-rpc/src/proto.rs | 19 ++++++-- zed/src/rpc.rs | 37 ++++++++++----- zed/src/workspace.rs | 64 ++++++++++++++++++++----- zed/src/worktree.rs | 34 +++++++++++-- 7 files changed, 235 insertions(+), 62 deletions(-) diff --git a/zed-rpc/proto/zed.proto b/zed-rpc/proto/zed.proto index 6ed4c29bc4c62c7a6db777545240e09bfb24c2d9..efbf8a02ef0ebd9859d6d1be71ceb9fccc29281d 100644 --- a/zed-rpc/proto/zed.proto +++ b/zed-rpc/proto/zed.proto @@ -4,18 +4,19 @@ package zed.messages; message Envelope { uint32 id = 1; optional uint32 responding_to = 2; + optional uint32 original_sender_id = 3; oneof payload { - Auth auth = 3; - AuthResponse auth_response = 4; - ShareWorktree share_worktree = 5; - ShareWorktreeResponse share_worktree_response = 6; - OpenWorktree open_worktree = 7; - OpenWorktreeResponse open_worktree_response = 8; - OpenFile open_file = 9; - OpenFileResponse open_file_response = 10; - CloseFile close_file = 11; - OpenBuffer open_buffer = 12; - OpenBufferResponse open_buffer_response = 13; + Auth auth = 4; + AuthResponse auth_response = 5; + ShareWorktree share_worktree = 6; + ShareWorktreeResponse share_worktree_response = 7; + OpenWorktree open_worktree = 8; + OpenWorktreeResponse open_worktree_response = 9; + OpenFile open_file = 10; + OpenFileResponse open_file_response = 11; + CloseFile close_file = 12; + OpenBuffer open_buffer = 13; + OpenBufferResponse open_buffer_response = 14; } } @@ -46,6 +47,15 @@ message OpenWorktreeResponse { Worktree worktree = 1; } +message AddGuest { + uint64 worktree_id = 1; + User user = 2; +} + +message RemoveGuest { + uint64 worktree_id = 1; +} + message OpenFile { uint64 worktree_id = 1; string path = 2; @@ -69,6 +79,12 @@ message OpenBufferResponse { Buffer buffer = 1; } +message User { + string github_login = 1; + string avatar_url = 2; + uint64 id = 3; +} + message Worktree { string root_name = 1; repeated Entry entries = 2; diff --git a/zed-rpc/src/lib.rs b/zed-rpc/src/lib.rs index c57dec14049f2b20641a0c6ca0fc77bf79cba160..e9e18e92b8d5e9b22f59d64be4dec0869d9a3277 100644 --- a/zed-rpc/src/lib.rs +++ b/zed-rpc/src/lib.rs @@ -3,4 +3,4 @@ mod peer; pub mod proto; pub mod rest; -pub use peer::{ConnectionId, Peer, TypedEnvelope}; +pub use peer::*; diff --git a/zed-rpc/src/peer.rs b/zed-rpc/src/peer.rs index 4c23060a5cc2e3787fbc07c34a836661a2e89d0d..48111c2b9232853d05575e2073caed48d7ca2812 100644 --- a/zed-rpc/src/peer.rs +++ b/zed-rpc/src/peer.rs @@ -14,6 +14,7 @@ use std::{ collections::{HashMap, HashSet}, fmt, future::Future, + marker::PhantomData, pin::Pin, sync::{ atomic::{self, AtomicU32}, @@ -27,6 +28,9 @@ type BoxedReader = Pin>; #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] pub struct ConnectionId(u32); +#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] +pub struct PeerId(u32); + struct Connection { writer: Mutex>, reader: Mutex>, @@ -38,12 +42,30 @@ type MessageHandler = Box< dyn Send + Sync + Fn(&mut Option, ConnectionId) -> Option>, >; +#[derive(Clone, Copy)] +pub struct Receipt { + sender_id: ConnectionId, + message_id: u32, + payload_type: PhantomData, +} + pub struct TypedEnvelope { - pub id: u32, - pub connection_id: ConnectionId, + pub sender_id: ConnectionId, + pub original_sender_id: Option, + pub message_id: u32, pub payload: T, } +impl TypedEnvelope { + pub fn receipt(&self) -> Receipt { + Receipt { + sender_id: self.sender_id, + message_id: self.message_id, + payload_type: PhantomData, + } + } +} + pub struct Peer { connections: RwLock>>, connection_close_barriers: RwLock>, @@ -81,8 +103,9 @@ impl Peer { Some( async move { tx.send(TypedEnvelope { - id: envelope.id, - connection_id, + sender_id: connection_id, + original_sender_id: envelope.original_sender_id.map(PeerId), + message_id: envelope.id, payload: T::from_envelope(envelope).unwrap(), }) .await @@ -200,25 +223,45 @@ impl Peer { ) -> Result> { let connection = self.connection(connection_id).await?; let envelope = connection.reader.lock().await.read_message().await?; - let id = envelope.id; + let original_sender_id = envelope.original_sender_id; + let message_id = envelope.id; let payload = M::from_envelope(envelope).ok_or_else(|| anyhow!("unexpected message type"))?; Ok(TypedEnvelope { - id, - connection_id, + sender_id: connection_id, + original_sender_id: original_sender_id.map(PeerId), + message_id, payload, }) } pub fn request( self: &Arc, - connection_id: ConnectionId, - req: T, + receiver_id: ConnectionId, + request: T, + ) -> impl Future> { + self.request_internal(None, receiver_id, request) + } + + pub fn forward_request( + self: &Arc, + sender_id: ConnectionId, + receiver_id: ConnectionId, + request: T, + ) -> impl Future> { + self.request_internal(Some(sender_id), receiver_id, request) + } + + pub fn request_internal( + self: &Arc, + original_sender_id: Option, + receiver_id: ConnectionId, + request: T, ) -> impl Future> { let this = self.clone(); let (tx, mut rx) = oneshot::channel(); async move { - let connection = this.connection(connection_id).await?; + let connection = this.connection(receiver_id).await?; let message_id = connection .next_message_id .fetch_add(1, atomic::Ordering::SeqCst); @@ -231,7 +274,11 @@ impl Peer { .writer .lock() .await - .write_message(&req.into_envelope(message_id, None)) + .write_message(&request.into_envelope( + message_id, + None, + original_sender_id.map(|id| id.0), + )) .await?; let response = rx .recv() @@ -257,7 +304,7 @@ impl Peer { .writer .lock() .await - .write_message(&message.into_envelope(message_id, None)) + .write_message(&message.into_envelope(message_id, None, None)) .await?; Ok(()) } @@ -265,12 +312,12 @@ impl Peer { pub fn respond( self: &Arc, - request: TypedEnvelope, + receipt: Receipt, response: T::Response, ) -> impl Future> { let this = self.clone(); async move { - let connection = this.connection(request.connection_id).await?; + let connection = this.connection(receipt.sender_id).await?; let message_id = connection .next_message_id .fetch_add(1, atomic::Ordering::SeqCst); @@ -278,7 +325,7 @@ impl Peer { .writer .lock() .await - .write_message(&response.into_envelope(message_id, Some(request.id))) + .write_message(&response.into_envelope(message_id, Some(receipt.message_id), None)) .await?; Ok(()) } @@ -301,6 +348,12 @@ impl fmt::Display for ConnectionId { } } +impl fmt::Display for PeerId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + #[cfg(test)] mod tests { use super::*; @@ -396,19 +449,31 @@ mod tests { async move { let msg = auth_rx.recv().await.unwrap(); assert_eq!(msg.payload, request1); - server.respond(msg, response1.clone()).await.unwrap(); + server + .respond(msg.receipt(), response1.clone()) + .await + .unwrap(); let msg = auth_rx.recv().await.unwrap(); assert_eq!(msg.payload, request2.clone()); - server.respond(msg, response2.clone()).await.unwrap(); + server + .respond(msg.receipt(), response2.clone()) + .await + .unwrap(); let msg = open_buffer_rx.recv().await.unwrap(); assert_eq!(msg.payload, request3.clone()); - server.respond(msg, response3.clone()).await.unwrap(); + server + .respond(msg.receipt(), response3.clone()) + .await + .unwrap(); let msg = open_buffer_rx.recv().await.unwrap(); assert_eq!(msg.payload, request4.clone()); - server.respond(msg, response4.clone()).await.unwrap(); + server + .respond(msg.receipt(), response4.clone()) + .await + .unwrap(); server_done_tx.send(()).await.unwrap(); } diff --git a/zed-rpc/src/proto.rs b/zed-rpc/src/proto.rs index ff5a2a203ff86c96c008480260de5a61ff3672c6..196ddeddc8d6337b934896c06566b82f32b5a4de 100644 --- a/zed-rpc/src/proto.rs +++ b/zed-rpc/src/proto.rs @@ -6,7 +6,12 @@ include!(concat!(env!("OUT_DIR"), "/zed.messages.rs")); pub trait EnvelopedMessage: Sized + Send + 'static { const NAME: &'static str; - fn into_envelope(self, id: u32, responding_to: Option) -> Envelope; + fn into_envelope( + self, + id: u32, + responding_to: Option, + original_sender_id: Option, + ) -> Envelope; fn matches_envelope(envelope: &Envelope) -> bool; fn from_envelope(envelope: Envelope) -> Option; } @@ -20,10 +25,16 @@ macro_rules! message { impl EnvelopedMessage for $name { const NAME: &'static str = std::stringify!($name); - fn into_envelope(self, id: u32, responding_to: Option) -> Envelope { + fn into_envelope( + self, + id: u32, + responding_to: Option, + original_sender_id: Option, + ) -> Envelope { Envelope { id, responding_to, + original_sender_id, payload: Some(envelope::Payload::$name(self)), } } @@ -132,13 +143,13 @@ mod tests { user_id: 5, access_token: "the-access-token".into(), } - .into_envelope(3, None); + .into_envelope(3, None, None); let message2 = OpenBuffer { worktree_id: 1, path: "path".to_string(), } - .into_envelope(5, None); + .into_envelope(5, None, None); let mut message_stream = MessageStream::new(byte_stream); message_stream.write_message(&message1).await.unwrap(); diff --git a/zed/src/rpc.rs b/zed/src/rpc.rs index 886b76364b395b206316dc6664d7b700659a6388..f4e8f43834297a8bc9510fa3c5a5c953df6f9667 100644 --- a/zed/src/rpc.rs +++ b/zed/src/rpc.rs @@ -1,14 +1,18 @@ +use crate::worktree::{FileHandle, Worktree}; + use super::util::SurfResultExt as _; use anyhow::{anyhow, Context, Result}; use gpui::executor::Background; -use gpui::{AsyncAppContext, Task}; +use gpui::{AsyncAppContext, ModelHandle, Task}; use lazy_static::lazy_static; use postage::prelude::Stream; use smol::lock::Mutex; +use std::collections::{HashMap, HashSet}; use std::time::Duration; use std::{convert::TryFrom, future::Future, sync::Arc}; use surf::Url; use zed_rpc::{proto::RequestMessage, rest, Peer, TypedEnvelope}; +use zed_rpc::{PeerId, Receipt}; pub use zed_rpc::{proto, ConnectionId}; @@ -20,13 +24,14 @@ lazy_static! { #[derive(Clone)] pub struct Client { peer: Arc, - state: Arc>, + pub state: Arc>, } #[derive(Default)] -struct ClientState { - // TODO - allow multiple connections +pub struct ClientState { connection_id: Option, + pub shared_worktrees: HashSet>, + pub shared_files: HashMap>, } impl Client { @@ -42,11 +47,11 @@ impl Client { H: 'static + for<'a> MessageHandler<'a, M>, M: proto::EnvelopedMessage, { - let peer = self.peer.clone(); - let mut messages = smol::block_on(peer.add_message_handler::()); + let this = self.clone(); + let mut messages = smol::block_on(this.peer.add_message_handler::()); cx.spawn(|mut cx| async move { while let Some(message) = messages.recv().await { - if let Err(err) = handler.handle(message, &peer, &mut cx).await { + if let Err(err) = handler.handle(message, &this, &mut cx).await { log::error!("error handling message: {:?}", err); } } @@ -189,9 +194,17 @@ impl Client { pub fn request( &self, connection_id: ConnectionId, - req: T, + request: T, ) -> impl Future> { - self.peer.request(connection_id, req) + self.peer.request(connection_id, request) + } + + pub fn respond( + &self, + receipt: Receipt, + response: T::Response, + ) -> impl Future> { + self.peer.respond(receipt, response) } } @@ -201,7 +214,7 @@ pub trait MessageHandler<'a, M: proto::EnvelopedMessage> { fn handle( &self, message: TypedEnvelope, - rpc: &'a Arc, + rpc: &'a Client, cx: &'a mut gpui::AsyncAppContext, ) -> Self::Output; } @@ -209,7 +222,7 @@ pub trait MessageHandler<'a, M: proto::EnvelopedMessage> { impl<'a, M, F, Fut> MessageHandler<'a, M> for F where M: proto::EnvelopedMessage, - F: Fn(TypedEnvelope, &'a Arc, &'a mut gpui::AsyncAppContext) -> Fut, + F: Fn(TypedEnvelope, &'a Client, &'a mut gpui::AsyncAppContext) -> Fut, Fut: 'a + Future>, { type Output = Fut; @@ -217,7 +230,7 @@ where fn handle( &self, message: TypedEnvelope, - rpc: &'a Arc, + rpc: &'a Client, cx: &'a mut gpui::AsyncAppContext, ) -> Self::Output { (self)(message, rpc, cx) diff --git a/zed/src/workspace.rs b/zed/src/workspace.rs index e97c4527d8c7c9860766aca8b3ab002facdcea36..411dc878c02aac5318ada697e621db402765be83 100644 --- a/zed/src/workspace.rs +++ b/zed/src/workspace.rs @@ -28,7 +28,7 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; -use zed_rpc::{proto, Peer, TypedEnvelope}; +use zed_rpc::{proto, TypedEnvelope}; pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) { cx.add_global_action("workspace:open", open); @@ -44,7 +44,8 @@ pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) { ]); pane::init(cx); - rpc.on_message(handle_open_buffer, cx); + rpc.on_message(remote::open_file, cx); + rpc.on_message(remote::open_buffer, cx); } pub struct OpenParams { @@ -106,18 +107,57 @@ fn open_paths(params: &OpenParams, cx: &mut MutableAppContext) { }); } -async fn handle_open_buffer( - request: TypedEnvelope, - rpc: &Arc, - cx: &mut AsyncAppContext, -) -> anyhow::Result<()> { - let payload = &request.payload; - dbg!(&payload.path); - rpc.respond(request, proto::OpenBufferResponse { buffer: None }) +mod remote { + use super::*; + + pub async fn open_file( + request: TypedEnvelope, + rpc: &rpc::Client, + cx: &mut AsyncAppContext, + ) -> anyhow::Result<()> { + let message = &request.payload; + let mut state = rpc.state.lock().await; + + let worktree = state + .shared_worktrees + .get(&(message.worktree_id as usize)) + .ok_or_else(|| anyhow!("worktree {} not found", message.worktree_id))? + .clone(); + + let peer_id = request + .original_sender_id + .ok_or_else(|| anyhow!("missing original sender id"))?; + + let file = cx.update(|cx| worktree.file(&message.path, cx)).await?; + + let file_entry = state.shared_files.entry(file); + if matches!(file_entry, Entry::Vacant(_)) { + worktree.update(cx, |worktree, cx| {}); + } + *file_entry + .or_insert(Default::default()) + .entry(peer_id) + .or_insert(0) += 1; + + todo!() + } + + pub async fn open_buffer( + request: TypedEnvelope, + rpc: &rpc::Client, + cx: &mut AsyncAppContext, + ) -> anyhow::Result<()> { + let payload = &request.payload; + dbg!(&payload.path); + rpc.respond( + request.receipt(), + proto::OpenBufferResponse { buffer: None }, + ) .await?; - dbg!(cx.read(|app| app.root_view_id(1))); - Ok(()) + dbg!(cx.read(|app| app.root_view_id(1))); + Ok(()) + } } pub trait Item: Entity + Sized { diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index e003522cd66a6a3e4a0e5b1ae21a6feb67da2a5d..9395ccd41c5f97123dd07629db0677167d1ef7fe 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -2,6 +2,7 @@ mod char_bag; mod fuzzy; mod ignore; +use self::{char_bag::CharBag, ignore::IgnoreStack}; use crate::{ editor::{History, Rope}, rpc::{self, proto, ConnectionId}, @@ -25,16 +26,18 @@ use std::{ ffi::{CStr, OsStr, OsString}, fmt, fs, future::Future, + hash::Hash, io::{self, Read, Write}, ops::Deref, os::unix::{ffi::OsStrExt, fs::MetadataExt}, path::{Path, PathBuf}, - sync::{Arc, Weak}, + sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, Weak, + }, time::{Duration, SystemTime}, }; -use self::{char_bag::CharBag, ignore::IgnoreStack}; - lazy_static! { static ref GITIGNORE: &'static OsStr = OsStr::new(".gitignore"); } @@ -132,6 +135,7 @@ pub struct LocalWorktree { snapshot: Snapshot, background_snapshot: Arc>, handles: Arc, Weak>>>>, + next_handle_id: AtomicUsize, scan_state: (watch::Sender, watch::Receiver), _event_stream_handle: fsevent::Handle, poll_scheduled: bool, @@ -149,6 +153,7 @@ struct FileHandleState { path: Arc, is_deleted: bool, mtime: SystemTime, + id: usize, } impl LocalWorktree { @@ -174,6 +179,7 @@ impl LocalWorktree { snapshot, background_snapshot: background_snapshot.clone(), handles: handles.clone(), + next_handle_id: Default::default(), scan_state: watch::channel_with(ScanState::Scanning), _event_stream_handle: event_stream_handle, poll_scheduled: false, @@ -326,6 +332,7 @@ impl LocalWorktree { self.rpc = Some(client.clone()); let root_name = self.root_name.clone(); let snapshot = self.snapshot(); + let handle = cx.handle(); cx.spawn(|_this, cx| async move { let entries = cx .background_executor() @@ -353,6 +360,8 @@ impl LocalWorktree { ) .await?; + client.state.lock().await.shared_worktrees.insert(handle); + log::info!("sharing worktree {:?}", share_response); Ok((share_response.worktree_id, share_response.access_token)) }) @@ -685,6 +694,21 @@ impl FileHandle { } } +impl PartialEq for FileHandle { + fn eq(&self, other: &Self) -> bool { + self.worktree == other.worktree && self.state.lock().id == other.state.lock().id + } +} + +impl Eq for FileHandle {} + +impl Hash for FileHandle { + fn hash(&self, state: &mut H) { + self.state.lock().id.hash(state); + self.worktree.hash(state); + } +} + #[derive(Clone, Debug)] pub struct Entry { kind: EntryKind, @@ -1420,17 +1444,21 @@ impl WorktreeHandle for ModelHandle { .get(&path) .and_then(Weak::upgrade) .unwrap_or_else(|| { + let id = + tree.as_local().unwrap().next_handle_id.fetch_add(1, SeqCst); let handle_state = if let Some(entry) = tree.entry_for_path(&path) { FileHandleState { path: entry.path().clone(), is_deleted: false, mtime, + id, } } else { FileHandleState { path: path.clone(), is_deleted: !tree.path_is_pending(path), mtime, + id, } };