diff --git a/zed-rpc/proto/zed.proto b/zed-rpc/proto/zed.proto index 634f167fdce3c51401215d6385d2c4801afeec43..879c9766cce8ba4a22a2d2e99a5b00b22738d064 100644 --- a/zed-rpc/proto/zed.proto +++ b/zed-rpc/proto/zed.proto @@ -12,11 +12,9 @@ message Envelope { ShareWorktreeResponse share_worktree_response = 7; OpenWorktree open_worktree = 8; OpenWorktreeResponse open_worktree_response = 9; - OpenFile open_file = 10; - OpenFileResponse open_file_response = 11; - CloseFile close_file = 12; - OpenBuffer open_buffer = 13; - OpenBufferResponse open_buffer_response = 14; + OpenBuffer open_buffer = 10; + OpenBufferResponse open_buffer_response = 11; + CloseBuffer close_buffer = 12; } } @@ -57,28 +55,18 @@ message RemoveGuest { uint64 worktree_id = 1; } -message OpenFile { +message OpenBuffer { uint64 worktree_id = 1; string path = 2; } -message OpenFileResponse { - uint64 id = 1; - uint64 mtime = 2; -} - -message CloseFile { - uint64 worktree_id = 1; - uint64 id = 2; -} - -message OpenBuffer { - uint64 worktree_id = 1; - uint64 id = 2; +message OpenBufferResponse { + uint64 buffer_id = 1; + Buffer buffer = 2; } -message OpenBufferResponse { - Buffer buffer = 1; +message CloseBuffer { + uint64 id = 1; } message User { @@ -116,15 +104,22 @@ message Operation { uint32 local_timestamp = 2; uint32 lamport_timestamp = 3; repeated VectorClockEntry version = 4; + repeated Range ranges = 5; + optional string new_text = 6; } +} - message VectorClockEntry { - uint32 replica_id = 1; - uint32 timestamp = 2; - } +message VectorClockEntry { + uint32 replica_id = 1; + uint32 timestamp = 2; } message Timestamp { uint64 seconds = 1; uint32 nanos = 2; -} \ No newline at end of file +} + +message Range { + uint64 start = 1; + uint64 end = 2; +} diff --git a/zed-rpc/src/peer.rs b/zed-rpc/src/peer.rs index f1a6cc4cd3bfef4b113cda7a80f913618eb1f815..0d39b84eb50b0c43f16e30b903194fc0d84eb494 100644 --- a/zed-rpc/src/peer.rs +++ b/zed-rpc/src/peer.rs @@ -432,9 +432,10 @@ mod tests { }; let request3 = proto::OpenBuffer { worktree_id: 1, - id: 2, + path: "path/two".to_string(), }; let response3 = proto::OpenBufferResponse { + buffer_id: 2, buffer: Some(proto::Buffer { content: "path/two content".to_string(), history: vec![], @@ -442,9 +443,10 @@ mod tests { }; let request4 = proto::OpenBuffer { worktree_id: 2, - id: 1, + path: "path/one".to_string(), }; let response4 = proto::OpenBufferResponse { + buffer_id: 1, buffer: Some(proto::Buffer { content: "path/one content".to_string(), history: vec![], diff --git a/zed-rpc/src/proto.rs b/zed-rpc/src/proto.rs index 1330728b907dbb3e910a6fb2f6cece1a41e78d65..e1eafcd0cd11247f5f730d2fe573f16784c50897 100644 --- a/zed-rpc/src/proto.rs +++ b/zed-rpc/src/proto.rs @@ -71,9 +71,8 @@ macro_rules! request_message { request_message!(Auth, AuthResponse); request_message!(ShareWorktree, ShareWorktreeResponse); request_message!(OpenWorktree, OpenWorktreeResponse); -request_message!(OpenFile, OpenFileResponse); -message!(CloseFile); request_message!(OpenBuffer, OpenBufferResponse); +message!(CloseBuffer); /// A stream of protobuf messages. pub struct MessageStream { @@ -169,7 +168,7 @@ mod tests { let message2 = OpenBuffer { worktree_id: 0, - id: 1, + path: "some/path".to_string(), } .into_envelope(5, None, None); diff --git a/zed/src/editor/buffer.rs b/zed/src/editor/buffer.rs index d281353c58458b5708718bc70f59792496cecd8a..e268432f58b3f8314f86728af734667870733211 100644 --- a/zed/src/editor/buffer.rs +++ b/zed/src/editor/buffer.rs @@ -11,6 +11,7 @@ use seahash::SeaHasher; pub use selection::*; use similar::{ChangeTag, TextDiff}; use tree_sitter::{InputEdit, Parser, QueryCursor}; +use zed_rpc::proto; use crate::{ language::{Language, Tree}, @@ -493,6 +494,94 @@ impl Buffer { } } + pub fn from_proto( + replica_id: ReplicaId, + remote_buffer: proto::Buffer, + file: Option, + language: Option>, + cx: &mut ModelContext, + ) -> Result { + let mut buffer = Buffer::build( + replica_id, + History::new(remote_buffer.content.into()), + file, + language, + cx, + ); + let ops = remote_buffer + .history + .into_iter() + .filter_map(|op| op.variant) + .map(|op| match op { + proto::operation::Variant::Edit(edit) => { + let mut version = time::Global::new(); + for entry in edit.version { + version.observe(time::Local { + replica_id: entry.replica_id as ReplicaId, + value: entry.timestamp, + }); + } + let ranges = edit + .ranges + .into_iter() + .map(|range| range.start as usize..range.end as usize) + .collect(); + Operation::Edit(EditOperation { + timestamp: InsertionTimestamp { + replica_id: edit.replica_id as ReplicaId, + local: edit.local_timestamp, + lamport: edit.lamport_timestamp, + }, + version, + ranges, + new_text: edit.new_text, + }) + } + }); + buffer.apply_ops(ops, cx)?; + Ok(buffer) + } + + pub fn to_proto(&self) -> proto::Buffer { + let ops = self + .history + .ops + .values() + .map(|op| { + let version = op + .version + .iter() + .map(|entry| proto::VectorClockEntry { + replica_id: entry.replica_id as u32, + timestamp: entry.value, + }) + .collect(); + let ranges = op + .ranges + .iter() + .map(|range| proto::Range { + start: range.start as u64, + end: range.end as u64, + }) + .collect(); + proto::Operation { + variant: Some(proto::operation::Variant::Edit(proto::operation::Edit { + replica_id: op.timestamp.replica_id as u32, + local_timestamp: op.timestamp.local, + lamport_timestamp: op.timestamp.lamport, + version, + ranges, + new_text: op.new_text.clone(), + })), + } + }) + .collect(); + proto::Buffer { + content: self.history.base_text.to_string(), + history: ops, + } + } + pub fn file(&self) -> Option<&File> { self.file.as_ref() } diff --git a/zed/src/main.rs b/zed/src/main.rs index b5c1cee95b72217e0a4ce78a9dd6713504546acd..0f0007c78890f624b3e7d9bd35db8d6ca05b008b 100644 --- a/zed/src/main.rs +++ b/zed/src/main.rs @@ -21,9 +21,9 @@ fn main() { language_registry.set_theme(&settings.borrow().theme); let app_state = AppState { - language_registry, + language_registry: language_registry.clone(), settings, - rpc: rpc::Client::new(), + rpc: rpc::Client::new(language_registry), }; app.run(move |cx| { diff --git a/zed/src/rpc.rs b/zed/src/rpc.rs index a46fec1142ca04dec42aeb3a8c45e6fe3c105754..2fd0d47e3a59224e0c82b675d1e17c6c54557b06 100644 --- a/zed/src/rpc.rs +++ b/zed/src/rpc.rs @@ -1,6 +1,5 @@ -use crate::worktree::{File, Worktree}; - use super::util::SurfResultExt as _; +use crate::{editor::Buffer, language::LanguageRegistry, worktree::Worktree}; use anyhow::{anyhow, Context, Result}; use gpui::executor::Background; use gpui::{AsyncAppContext, ModelHandle, Task}; @@ -29,18 +28,23 @@ pub struct Client { pub state: Arc>, } -#[derive(Default)] pub struct ClientState { connection_id: Option, pub shared_worktrees: HashMap>, - pub shared_files: HashMap>, + pub shared_buffers: HashMap>>, + pub language_registry: Arc, } impl Client { - pub fn new() -> Self { + pub fn new(language_registry: Arc) -> Self { Self { peer: Peer::new(), - state: Default::default(), + state: Arc::new(Mutex::new(ClientState { + connection_id: None, + shared_worktrees: Default::default(), + shared_buffers: Default::default(), + language_registry, + })), } } diff --git a/zed/src/test.rs b/zed/src/test.rs index bced9164830a428bd62ee53e5a8843be55f64988..d698a50490c6455dd27b0b35641fbe6b85534984 100644 --- a/zed/src/test.rs +++ b/zed/src/test.rs @@ -1,4 +1,4 @@ -use crate::{AppState, language::LanguageRegistry, rpc, settings, time::ReplicaId}; +use crate::{language::LanguageRegistry, rpc, settings, time::ReplicaId, AppState}; use ctor::ctor; use gpui::AppContext; use rand::Rng; @@ -149,7 +149,7 @@ pub fn build_app_state(cx: &AppContext) -> AppState { let language_registry = Arc::new(LanguageRegistry::new()); AppState { settings, - language_registry, - rpc: rpc::Client::new(), + language_registry: language_registry.clone(), + rpc: rpc::Client::new(language_registry), } } diff --git a/zed/src/workspace.rs b/zed/src/workspace.rs index 4945e4f46d5243e9c6b9a4d5627661bddc6c2deb..04819a09e62644671b0f460f3d1dd917fd74c14b 100644 --- a/zed/src/workspace.rs +++ b/zed/src/workspace.rs @@ -44,9 +44,8 @@ pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) { ]); pane::init(cx); - rpc.on_message(remote::open_file, cx); - rpc.on_message(remote::close_file, cx); rpc.on_message(remote::open_buffer, cx); + rpc.on_message(remote::close_buffer, cx); } pub struct OpenParams { @@ -110,41 +109,52 @@ fn open_paths(params: &OpenParams, cx: &mut MutableAppContext) { mod remote { use super::*; - pub async fn open_file( - request: TypedEnvelope, + pub async fn open_buffer( + request: TypedEnvelope, rpc: &rpc::Client, cx: &mut AsyncAppContext, ) -> anyhow::Result<()> { - // let message = &request.payload; - // let peer_id = request - // .original_sender_id - // .ok_or_else(|| anyhow!("missing original sender id"))?; - - // let mut state = rpc.state.lock().await; - // let worktree = state - // .shared_worktrees - // .get(&message.worktree_id) - // .ok_or_else(|| anyhow!("worktree {} not found", message.worktree_id))? - // .clone(); - - // let file = worktree.file(&message.path); - // let id = file.id() as u64; - // let mtime = file.mtime().as_secs(); + let message = &request.payload; + let peer_id = request + .original_sender_id + .ok_or_else(|| anyhow!("missing original sender id"))?; - // *state - // .shared_files - // .entry(file) - // .or_insert(Default::default()) - // .entry(peer_id) - // .or_insert(0) += 1; + let mut state = rpc.state.lock().await; + let worktree = state + .shared_worktrees + .get(&message.worktree_id) + .ok_or_else(|| anyhow!("worktree {} not found", message.worktree_id))? + .clone(); + + let buffer = worktree + .update(cx, |worktree, cx| { + worktree.open_buffer( + Path::new(&message.path), + state.language_registry.clone(), + cx, + ) + }) + .await?; + state + .shared_buffers + .entry(peer_id) + .or_default() + .insert(buffer.id(), buffer.clone()); + + rpc.respond( + request.receipt(), + proto::OpenBufferResponse { + buffer_id: buffer.id() as u64, + buffer: Some(buffer.read_with(cx, |buf, _| buf.to_proto())), + }, + ) + .await?; - // rpc.respond(request.receipt(), proto::OpenFileResponse { id, mtime }) - // .await?; Ok(()) } - pub async fn close_file( - request: TypedEnvelope, + pub async fn close_buffer( + request: TypedEnvelope, rpc: &rpc::Client, _: &mut AsyncAppContext, ) -> anyhow::Result<()> { @@ -169,33 +179,6 @@ mod remote { Ok(()) } - - pub async fn open_buffer( - request: TypedEnvelope, - rpc: &rpc::Client, - cx: &mut AsyncAppContext, - ) -> anyhow::Result<()> { - // let message = &request.payload; - // let handle = { - // let state = rpc.state.lock().await; - // let mut files = state.shared_files.keys(); - // files.find(|file| file.id() == message.id).cloned() - // }; - // let buffer = if let Some(handle) = handle { - // let history = cx.read(|cx| handle.load_history(cx)).await?; - // Some(proto::Buffer { - // content: history.base_text.to_string(), - // history: Vec::new(), - // }) - // } else { - // None - // }; - - // rpc.respond(request.receipt(), proto::OpenBufferResponse { buffer }) - // .await?; - - Ok(()) - } } pub trait Item: Entity + Sized { diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index e2e83f1f55007488d1245e63e1375b9adc96758d..973aa3a36366ca19b290935e98ea631c24e0ef1c 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -65,7 +65,7 @@ impl Worktree { } pub fn remote( - id: usize, + id: u64, worktree: proto::Worktree, rpc: rpc::Client, connection_id: ConnectionId, @@ -98,6 +98,14 @@ impl Worktree { } } + pub fn as_remote_mut(&mut self) -> Option<&mut RemoteWorktree> { + if let Worktree::Remote(worktree) = self { + Some(worktree) + } else { + None + } + } + pub fn snapshot(&self) -> Snapshot { match self { Worktree::Local(worktree) => worktree.snapshot(), @@ -110,10 +118,12 @@ impl Worktree { path: impl AsRef, language_registry: Arc, cx: &mut ModelContext, - ) -> impl Future>> + 'static { + ) -> Task>> { match self { Worktree::Local(worktree) => worktree.open_buffer(path.as_ref(), language_registry, cx), - Worktree::Remote(_) => todo!(), + Worktree::Remote(worktree) => { + worktree.open_buffer(path.as_ref(), language_registry, cx) + } } } @@ -148,7 +158,7 @@ pub struct LocalWorktree { _event_stream_handle: fsevent::Handle, poll_scheduled: bool, rpc: Option, - open_buffers: HashSet>, + open_buffers: HashMap>, } impl LocalWorktree { @@ -220,12 +230,12 @@ impl LocalWorktree { path: &Path, language_registry: Arc, cx: &mut ModelContext, - ) -> impl Future>> + 'static { + ) -> Task>> { let handle = cx.handle(); // If there is already a buffer for the given path, then return it. let mut existing_buffer = None; - self.open_buffers.retain(|buffer| { + self.open_buffers.retain(|_buffer_id, buffer| { if let Some(buffer) = buffer.upgrade(cx.as_ref()) { if let Some(file) = buffer.read(cx.as_ref()).file() { if file.worktree_id() == handle.id() && file.path.as_ref() == path { @@ -238,12 +248,16 @@ impl LocalWorktree { } }); - let mut new_buffer = None; - if existing_buffer.is_none() { - let path = Arc::from(path); - let contents = self.load(&path, cx.as_ref()); - new_buffer = Some(cx.spawn(|this, mut cx| async move { - let contents = contents.await?; + let path = Arc::from(path); + cx.spawn(|this, mut cx| async move { + if let Some(existing_buffer) = existing_buffer { + Ok(existing_buffer) + } else { + let contents = this + .read_with(&cx, |this, cx| { + this.as_local().unwrap().load(&path, cx.as_ref()) + }) + .await?; let language = language_registry.select_language(&path).cloned(); let file = File::new(handle, path.into()); let buffer = cx.add_model(|cx| { @@ -251,19 +265,11 @@ impl LocalWorktree { }); this.update(&mut cx, |this, _| { let this = this.as_local_mut().unwrap(); - this.open_buffers.insert(buffer.downgrade()); + this.open_buffers.insert(buffer.id(), buffer.downgrade()); }); Ok(buffer) - })); - } - - async move { - if let Some(existing_buffer) = existing_buffer { - Ok(existing_buffer) - } else { - new_buffer.unwrap().await } - } + }) } pub fn scan_complete(&self) -> impl Future { @@ -292,7 +298,7 @@ impl LocalWorktree { fn observe_snapshot_diff(&mut self, diff: Diff, cx: &mut ModelContext) { let handle = cx.handle(); - self.open_buffers.retain(|buffer| { + self.open_buffers.retain(|_buffer_id, buffer| { if let Some(buffer) = buffer.upgrade(cx.as_ref()) { buffer.update(cx, |buffer, cx| { let handle = handle.clone(); @@ -506,16 +512,17 @@ impl fmt::Debug for LocalWorktree { } pub struct RemoteWorktree { - remote_id: usize, + remote_id: u64, snapshot: Snapshot, rpc: rpc::Client, connection_id: ConnectionId, replica_id: ReplicaId, + open_buffers: HashMap>, } impl RemoteWorktree { fn new( - remote_id: usize, + remote_id: u64, worktree: proto::Worktree, rpc: rpc::Client, connection_id: ConnectionId, @@ -568,8 +575,64 @@ impl RemoteWorktree { rpc, connection_id, replica_id, + open_buffers: Default::default(), } } + + pub fn open_buffer( + &mut self, + path: &Path, + language_registry: Arc, + cx: &mut ModelContext, + ) -> Task>> { + let handle = cx.handle(); + let mut existing_buffer = None; + self.open_buffers.retain(|_buffer_id, buffer| { + if let Some(buffer) = buffer.upgrade(cx.as_ref()) { + if let Some(file) = buffer.read(cx.as_ref()).file() { + if file.worktree_id() == handle.id() && file.path.as_ref() == path { + existing_buffer = Some(buffer); + } + } + true + } else { + false + } + }); + + let rpc = self.rpc.clone(); + let replica_id = self.replica_id; + let connection_id = self.connection_id; + let remote_worktree_id = self.remote_id; + let path = path.to_string_lossy().to_string(); + cx.spawn(|this, mut cx| async move { + if let Some(existing_buffer) = existing_buffer { + Ok(existing_buffer) + } else { + let file = File::new(handle, Path::new(&path).into()); + let language = language_registry.select_language(&path).cloned(); + let response = rpc + .request( + connection_id, + proto::OpenBuffer { + worktree_id: remote_worktree_id as u64, + path, + }, + ) + .await?; + let buffer_id = response.buffer_id; + let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?; + let buffer = cx.add_model(|cx| { + Buffer::from_proto(replica_id, remote_buffer, Some(file), language, cx).unwrap() + }); + this.update(&mut cx, |this, _| { + let this = this.as_remote_mut().unwrap(); + this.open_buffers.insert(buffer_id, buffer.downgrade()); + }); + Ok(buffer) + } + }) + } } #[derive(Clone)] @@ -833,7 +896,9 @@ impl File { pub fn saved_buffer(&self, buffer: ModelHandle, cx: &mut MutableAppContext) { self.worktree.update(cx, |worktree, _| { if let Worktree::Local(worktree) = worktree { - worktree.open_buffers.insert(buffer.downgrade()); + worktree + .open_buffers + .insert(buffer.id(), buffer.downgrade()); } }) }