diff --git a/zed-rpc/proto/zed.proto b/zed-rpc/proto/zed.proto index 0910fd041a3cf8671c0add7d23f622ad4fb1a48a..07819e31c638b436404f691477ce1a61fa6771b5 100644 --- a/zed-rpc/proto/zed.proto +++ b/zed-rpc/proto/zed.proto @@ -15,7 +15,7 @@ message Envelope { OpenBuffer open_buffer = 10; OpenBufferResponse open_buffer_response = 11; CloseBuffer close_buffer = 12; - EditBuffer edit_buffer = 13; + UpdateBuffer update_buffer = 13; } } @@ -70,7 +70,7 @@ message CloseBuffer { uint64 buffer_id = 2; } -message EditBuffer { +message UpdateBuffer { uint64 worktree_id = 1; uint64 buffer_id = 2; repeated Operation operations = 3; @@ -99,13 +99,31 @@ message Entry { message Buffer { uint64 id = 1; string content = 2; - repeated Operation history = 3; + repeated Operation.Edit history = 3; +} + +message Selection { + uint64 id = 1; + Anchor start = 2; + Anchor end = 3; + bool reversed = 4; +} + +message Anchor { + repeated VectorClockEntry version = 1; + uint64 offset = 2; + Bias bias = 3; + enum Bias { + LEFT = 0; + Right = 1; + } } message Operation { oneof variant { Edit edit = 1; Undo undo = 2; + UpdateSelections update_selections = 3; } message Edit { @@ -125,6 +143,13 @@ message Operation { uint32 edit_local_timestamp = 5; uint32 count = 6; } + + message UpdateSelections { + uint32 replica_id = 1; + uint32 local_timestamp = 2; + uint32 lamport_timestamp = 3; + repeated Selection selections = 4; + } } message VectorClockEntry { diff --git a/zed-rpc/src/proto.rs b/zed-rpc/src/proto.rs index e1eafcd0cd11247f5f730d2fe573f16784c50897..be422c15014fd4ff9d6ea77c49b03334c55b1b9a 100644 --- a/zed-rpc/src/proto.rs +++ b/zed-rpc/src/proto.rs @@ -73,6 +73,7 @@ request_message!(ShareWorktree, ShareWorktreeResponse); request_message!(OpenWorktree, OpenWorktreeResponse); request_message!(OpenBuffer, OpenBufferResponse); message!(CloseBuffer); +message!(UpdateBuffer); /// A stream of protobuf messages. pub struct MessageStream { diff --git a/zed/src/editor/buffer.rs b/zed/src/editor/buffer.rs index 6acffe9c7ce362fecdf26901c06c2fb2663cd99a..c238bad806ab52fc2d9945f68ffc6f896572a1a7 100644 --- a/zed/src/editor/buffer.rs +++ b/zed/src/editor/buffer.rs @@ -16,7 +16,6 @@ use zed_rpc::proto; use crate::{ language::{Language, Tree}, operation_queue::{self, OperationQueue}, - rpc, settings::{StyleId, ThemeMap}, sum_tree::{self, FilterCursor, SumTree}, time::{self, ReplicaId}, @@ -29,6 +28,7 @@ use lazy_static::lazy_static; use std::{ cell::RefCell, cmp, + convert::{TryFrom, TryInto}, hash::BuildHasher, iter::Iterator, ops::{Deref, DerefMut, Range}, @@ -118,7 +118,6 @@ pub struct Buffer { undo_map: UndoMap, history: History, file: Option, - rpc: Option, language: Option>, syntax_tree: Mutex>, is_parsing: bool, @@ -127,15 +126,13 @@ pub struct Buffer { deferred_ops: OperationQueue, deferred_replicas: HashSet, replica_id: ReplicaId, - remote_id: Option, + remote_id: u64, local_clock: time::Local, lamport_clock: time::Lamport, - operation_callback: Option, + #[cfg(test)] + operations: Vec, } -type OperationCallback = - Box)>; - #[derive(Clone)] struct SyntaxTree { tree: Tree, @@ -427,8 +424,7 @@ impl Buffer { replica_id, History::new(base_text.into()), None, - None, - None, + cx.model_id() as u64, None, cx, ) @@ -441,15 +437,21 @@ impl Buffer { language: Option>, cx: &mut ModelContext, ) -> Self { - Self::build(replica_id, history, file, None, None, language, cx) + Self::build( + replica_id, + history, + file, + cx.model_id() as u64, + language, + cx, + ) } fn build( replica_id: ReplicaId, history: History, file: Option, - rpc: Option, - remote_id: Option, + remote_id: u64, language: Option>, cx: &mut ModelContext, ) -> Self { @@ -486,7 +488,6 @@ impl Buffer { undo_map: Default::default(), history, file, - rpc, syntax_tree: Mutex::new(None), is_parsing: false, language, @@ -497,9 +498,11 @@ impl Buffer { deferred_replicas: HashSet::default(), replica_id, remote_id, - operation_callback: None, local_clock: time::Local::new(replica_id), lamport_clock: time::Lamport::new(replica_id), + + #[cfg(test)] + operations: Default::default(), }; result.reparse(cx); result @@ -518,7 +521,6 @@ impl Buffer { replica_id: ReplicaId, message: proto::Buffer, file: Option, - rpc: rpc::Client, language: Option>, cx: &mut ModelContext, ) -> Result { @@ -526,96 +528,20 @@ impl Buffer { replica_id, History::new(message.content.into()), file, - Some(rpc), - Some(message.id), + message.id, language, cx, ); let ops = message .history .into_iter() - .filter_map(|op| op.variant) - .map(|op| match op { - proto::operation::Variant::Edit(edit) => { - let mut version = time::Global::new(); - for entry in edit.version { - version.observe(time::Local { - replica_id: entry.replica_id as ReplicaId, - value: entry.timestamp, - }); - } - let ranges = edit - .ranges - .into_iter() - .map(|range| range.start as usize..range.end as usize) - .collect(); - Operation::Edit(EditOperation { - timestamp: InsertionTimestamp { - replica_id: edit.replica_id as ReplicaId, - local: edit.local_timestamp, - lamport: edit.lamport_timestamp, - }, - version, - ranges, - new_text: edit.new_text, - }) - } - proto::operation::Variant::Undo(undo) => Operation::Undo { - lamport_timestamp: time::Lamport { - replica_id: undo.replica_id as ReplicaId, - value: undo.lamport_timestamp, - }, - undo: UndoOperation { - id: time::Local { - replica_id: undo.replica_id as ReplicaId, - value: undo.local_timestamp, - }, - edit_id: time::Local { - replica_id: undo.edit_replica_id as ReplicaId, - value: undo.edit_local_timestamp, - }, - count: undo.count, - }, - }, - }); + .map(|op| Operation::Edit(op.into())); buffer.apply_ops(ops, cx)?; Ok(buffer) } pub fn to_proto(&self, cx: &mut ModelContext) -> proto::Buffer { - let ops = self - .history - .ops - .values() - .map(|op| { - let version = op - .version - .iter() - .map(|entry| proto::VectorClockEntry { - replica_id: entry.replica_id as u32, - timestamp: entry.value, - }) - .collect(); - let ranges = op - .ranges - .iter() - .map(|range| proto::Range { - start: range.start as u64, - end: range.end as u64, - }) - .collect(); - proto::Operation { - variant: Some(proto::operation::Variant::Edit(proto::operation::Edit { - replica_id: op.timestamp.replica_id as u32, - local_timestamp: op.timestamp.local, - lamport_timestamp: op.timestamp.lamport, - version, - ranges, - new_text: op.new_text.clone(), - })), - } - }) - .collect(); + let ops = self.history.ops.values().map(Into::into).collect(); proto::Buffer { id: cx.model_id() as u64, content: self.history.base_text.to_string(), @@ -657,7 +583,7 @@ impl Buffer { ) { if let Some(new_file) = new_file { let buffer = cx.handle(); - new_file.saved_buffer(buffer, cx.as_mut()); + new_file.buffer_added(buffer, cx.as_mut()); self.file = Some(new_file); } if let Some(file) = &self.file { @@ -907,6 +833,10 @@ impl Buffer { .map_or(false, |file| file.mtime(cx) > self.saved_mtime) } + pub fn remote_id(&self) -> u64 { + self.remote_id + } + pub fn version(&self) -> time::Global { self.version.clone() } @@ -1406,17 +1336,16 @@ impl Buffer { self.lamport_clock.observe(timestamp.lamport()); } + #[cfg(not(test))] pub fn send_operation(&mut self, operation: Operation, cx: &mut ModelContext) { - if let Some(operation_callback) = self.operation_callback.as_mut() { - operation_callback(operation, cx); + if let Some(file) = &self.file { + file.buffer_updated(cx.handle(), operation, cx.as_mut()); } } - pub fn on_operation( - &mut self, - callback: impl FnMut(Operation, &mut ModelContext) + Send + Sync + 'static, - ) { - self.operation_callback = Some(Box::new(callback)); + #[cfg(test)] + pub fn send_operation(&mut self, operation: Operation, _: &mut ModelContext) { + self.operations.push(operation); } pub fn undo(&mut self, cx: &mut ModelContext) { @@ -1821,16 +1750,17 @@ impl Clone for Buffer { selections_last_update: self.selections_last_update.clone(), deferred_ops: self.deferred_ops.clone(), file: self.file.clone(), - rpc: self.rpc.clone(), language: self.language.clone(), syntax_tree: Mutex::new(self.syntax_tree.lock().clone()), is_parsing: false, deferred_replicas: self.deferred_replicas.clone(), replica_id: self.replica_id, - operation_callback: None, remote_id: self.remote_id.clone(), local_clock: self.local_clock.clone(), lamport_clock: self.lamport_clock.clone(), + + #[cfg(test)] + operations: self.operations.clone(), } } } @@ -1967,22 +1897,8 @@ impl Entity for Buffer { type Event = Event; fn release(&mut self, cx: &mut gpui::MutableAppContext) { - if let (Some(buffer_id), Some(file)) = (self.remote_id, self.file.as_ref()) { - let rpc = self.rpc.clone().unwrap(); - let worktree_id = file.worktree_id() as u64; - cx.background() - .spawn(async move { - if let Err(error) = rpc - .send(proto::CloseBuffer { - worktree_id, - buffer_id, - }) - .await - { - log::error!("error closing remote buffer: {}", error); - }; - }) - .detach(); + if let Some(file) = self.file.as_ref() { + file.buffer_removed(self.remote_id, cx); } } } @@ -2321,6 +2237,232 @@ impl Operation { } } +impl<'a> Into for &'a Operation { + fn into(self) -> proto::Operation { + proto::Operation { + variant: Some(match self { + Operation::Edit(edit) => proto::operation::Variant::Edit(edit.into()), + Operation::Undo { + undo, + lamport_timestamp, + } => proto::operation::Variant::Undo(proto::operation::Undo { + replica_id: undo.id.replica_id as u32, + local_timestamp: undo.id.value, + lamport_timestamp: lamport_timestamp.value, + edit_replica_id: undo.edit_id.replica_id as u32, + edit_local_timestamp: undo.edit_id.value, + count: undo.count, + }), + Operation::UpdateSelections { + set_id, + selections, + lamport_timestamp, + } => proto::operation::Variant::UpdateSelections( + proto::operation::UpdateSelections { + replica_id: set_id.replica_id as u32, + local_timestamp: set_id.value, + lamport_timestamp: lamport_timestamp.value, + selections: selections.as_ref().map_or(Vec::new(), |selections| { + selections + .iter() + .map(|selection| proto::Selection { + id: selection.id as u64, + start: Some((&selection.start).into()), + end: Some((&selection.end).into()), + reversed: selection.reversed, + }) + .collect() + }), + }, + ), + }), + } + } +} + +impl<'a> Into for &'a EditOperation { + fn into(self) -> proto::operation::Edit { + let version = self + .version + .iter() + .map(|entry| proto::VectorClockEntry { + replica_id: entry.replica_id as u32, + timestamp: entry.value, + }) + .collect(); + let ranges = self + .ranges + .iter() + .map(|range| proto::Range { + start: range.start as u64, + end: range.end as u64, + }) + .collect(); + proto::operation::Edit { + replica_id: self.timestamp.replica_id as u32, + local_timestamp: self.timestamp.local, + lamport_timestamp: self.timestamp.lamport, + version, + ranges, + new_text: self.new_text.clone(), + } + } +} + +impl<'a> Into for &'a Anchor { + fn into(self) -> proto::Anchor { + match self { + Anchor::Middle { + offset, + bias, + version, + } => proto::Anchor { + version: version + .iter() + .map(|entry| proto::VectorClockEntry { + replica_id: entry.replica_id as u32, + timestamp: entry.value, + }) + .collect(), + offset: *offset as u64, + bias: match bias { + Bias::Left => proto::anchor::Bias::Left as i32, + Bias::Right => proto::anchor::Bias::Right as i32, + }, + }, + Anchor::Start => proto::Anchor { + version: Vec::new(), + bias: proto::anchor::Bias::Left as i32, + offset: 0, + }, + Anchor::End => proto::Anchor { + version: Vec::new(), + bias: proto::anchor::Bias::Right as i32, + offset: u64::MAX, + }, + } + } +} + +impl TryFrom for Operation { + type Error = anyhow::Error; + + fn try_from(message: proto::Operation) -> Result { + Ok( + match message + .variant + .ok_or_else(|| anyhow!("missing operation variant"))? + { + proto::operation::Variant::Edit(edit) => Operation::Edit(edit.into()), + proto::operation::Variant::Undo(undo) => Operation::Undo { + lamport_timestamp: time::Lamport { + replica_id: undo.replica_id as ReplicaId, + value: undo.lamport_timestamp, + }, + undo: UndoOperation { + id: time::Local { + replica_id: undo.replica_id as ReplicaId, + value: undo.local_timestamp, + }, + edit_id: time::Local { + replica_id: undo.edit_replica_id as ReplicaId, + value: undo.edit_local_timestamp, + }, + count: undo.count, + }, + }, + proto::operation::Variant::UpdateSelections(message) => { + Operation::UpdateSelections { + set_id: time::Lamport { + replica_id: message.replica_id as ReplicaId, + value: message.local_timestamp, + }, + lamport_timestamp: time::Lamport { + replica_id: message.replica_id as ReplicaId, + value: message.lamport_timestamp, + }, + selections: Some( + message + .selections + .into_iter() + .map(|selection| { + Ok(Selection { + id: selection.id as usize, + start: selection + .start + .ok_or_else(|| anyhow!("missing selection start"))? + .try_into()?, + end: selection + .end + .ok_or_else(|| anyhow!("missing selection end"))? + .try_into()?, + reversed: selection.reversed, + goal: SelectionGoal::None, + }) + }) + .collect::, anyhow::Error>>()? + .into(), + ), + } + } + }, + ) + } +} + +impl From for EditOperation { + fn from(edit: proto::operation::Edit) -> Self { + let mut version = time::Global::new(); + for entry in edit.version { + version.observe(time::Local { + replica_id: entry.replica_id as ReplicaId, + value: entry.timestamp, + }); + } + let ranges = edit + .ranges + .into_iter() + .map(|range| range.start as usize..range.end as usize) + .collect(); + EditOperation { + timestamp: InsertionTimestamp { + replica_id: edit.replica_id as ReplicaId, + local: edit.local_timestamp, + lamport: edit.lamport_timestamp, + }, + version, + ranges, + new_text: edit.new_text, + } + } +} + +impl TryFrom for Anchor { + type Error = anyhow::Error; + + fn try_from(message: proto::Anchor) -> Result { + let mut version = time::Global::new(); + for entry in message.version { + version.observe(time::Local { + replica_id: entry.replica_id as ReplicaId, + value: entry.timestamp, + }); + } + + Ok(Self::Middle { + offset: message.offset as usize, + bias: if message.bias == proto::anchor::Bias::Left as i32 { + Bias::Left + } else if message.bias == proto::anchor::Bias::Right as i32 { + Bias::Right + } else { + Err(anyhow!("invalid anchor bias {}", message.bias))? + }, + version, + }) + } +} + impl operation_queue::Operation for Operation { fn timestamp(&self) -> time::Lamport { self.lamport_timestamp() @@ -2419,13 +2561,7 @@ mod tests { let buffer1 = cx.add_model(|cx| Buffer::new(0, "abcdef", cx)); let buffer2 = cx.add_model(|cx| Buffer::new(1, "abcdef", cx)); - let buffer_ops = Arc::new(Mutex::new(Vec::new())); - buffer1.update(cx, |buffer, cx| { - buffer.on_operation({ - let buffer_ops = buffer_ops.clone(); - move |op, _| buffer_ops.lock().push(op) - }); - + let buffer_ops = buffer1.update(cx, |buffer, cx| { let buffer_1_events = buffer_1_events.clone(); cx.subscribe(&buffer1, move |_, event, _| { buffer_1_events.borrow_mut().push(event.clone()) @@ -2452,14 +2588,14 @@ mod tests { // Undoing a transaction emits one edited event. buffer.undo(cx); + + buffer.operations.clone() }); // Incorporating a set of remote ops emits a single edited event, // followed by a dirtied event. buffer2.update(cx, |buffer, cx| { - buffer - .apply_ops::>(mem::take(buffer_ops.lock().as_mut()), cx) - .unwrap(); + buffer.apply_ops(buffer_ops, cx).unwrap(); }); let buffer_1_events = buffer_1_events.borrow(); @@ -3080,22 +3216,14 @@ mod tests { cx.add_model(|cx| { let mut buffer = Buffer::new(0, "1234", cx); - let operations = Arc::new(Mutex::new(Vec::new())); - buffer.on_operation({ - let edits = operations.clone(); - move |operation, _| { - edits.lock().push(operation); - } - }); - buffer.edit(vec![1..1], "abx", cx); buffer.edit(vec![3..4], "yzef", cx); buffer.edit(vec![3..5], "cd", cx); assert_eq!(buffer.text(), "1abcdef234"); - let edit1 = operations.lock()[0].clone(); - let edit2 = operations.lock()[1].clone(); - let edit3 = operations.lock()[2].clone(); + let edit1 = buffer.operations[0].clone(); + let edit2 = buffer.operations[1].clone(); + let edit3 = buffer.operations[2].clone(); buffer.undo_or_redo(edit1.edit_id().unwrap(), cx).unwrap(); assert_eq!(buffer.text(), "1cdef234"); @@ -3194,40 +3322,22 @@ mod tests { let buffer2 = cx.add_model(|cx| Buffer::new(2, text, cx)); let buffer3 = cx.add_model(|cx| Buffer::new(3, text, cx)); - let ops = Arc::new(Mutex::new(Vec::new())); - - buffer1.update(cx, |buffer, cx| { - buffer.on_operation({ - let ops = ops.clone(); - move |operation, _| ops.lock().push(operation) - }); - + let buf1_op = buffer1.update(cx, |buffer, cx| { buffer.edit(vec![1..2], "12", cx); assert_eq!(buffer.text(), "a12cdef"); + buffer.operations.last().unwrap().clone() }); - buffer2.update(cx, |buffer, cx| { - buffer.on_operation({ - let ops = ops.clone(); - move |operation, _| ops.lock().push(operation) - }); - + let buf2_op = buffer2.update(cx, |buffer, cx| { buffer.edit(vec![3..4], "34", cx); assert_eq!(buffer.text(), "abc34ef"); + buffer.operations.last().unwrap().clone() }); - buffer3.update(cx, |buffer, cx| { - buffer.on_operation({ - let ops = ops.clone(); - move |operation, _| ops.lock().push(operation) - }); - + let buf3_op = buffer3.update(cx, |buffer, cx| { buffer.edit(vec![5..6], "56", cx); assert_eq!(buffer.text(), "abcde56"); + buffer.operations.last().unwrap().clone() }); - let buf1_op = ops.lock()[0].clone(); - let buf2_op = ops.lock()[1].clone(); - let buf3_op = ops.lock()[2].clone(); - buffer1.update(cx, |buffer, _| { buffer.apply_op(buf2_op.clone()).unwrap(); buffer.apply_op(buf3_op.clone()).unwrap(); @@ -3265,7 +3375,6 @@ mod tests { for seed in start_seed..start_seed + iterations { dbg!(seed); let mut rng = StdRng::seed_from_u64(seed); - let network = Arc::new(Mutex::new(Network::new(StdRng::seed_from_u64(seed)))); let base_text_len = rng.gen_range(0..10); let base_text = RandomCharIter::new(&mut rng) @@ -3273,22 +3382,14 @@ mod tests { .collect::(); let mut replica_ids = Vec::new(); let mut buffers = Vec::new(); + let mut network = Network::new(StdRng::seed_from_u64(seed)); + for i in 0..peers { - let buffer = cx.add_model(|cx| { - let replica_id = i as ReplicaId; - let mut buffer = Buffer::new(replica_id, base_text.as_str(), cx); - buffer.on_operation({ - let network = network.clone(); - move |op, _| { - network.lock().broadcast(replica_id, vec![op]); - } - }); - buffer - }); + let buffer = cx.add_model(|cx| Buffer::new(i as ReplicaId, base_text.as_str(), cx)); buffers.push(buffer); replica_ids.push(i as u16); - network.lock().add_peer(i as u16); + network.add_peer(i as u16); } log::info!("initial text: {:?}", base_text); @@ -3300,15 +3401,17 @@ mod tests { buffers[replica_index].update(cx, |buffer, cx| match rng.gen_range(0..=100) { 0..=50 if mutation_count != 0 => { buffer.randomly_mutate(&mut rng, cx); + network.broadcast(buffer.replica_id, mem::take(&mut buffer.operations)); log::info!("buffer {} text: {:?}", buffer.replica_id, buffer.text()); mutation_count -= 1; } 51..=70 if mutation_count != 0 => { buffer.randomly_undo_redo(&mut rng, cx); + network.broadcast(buffer.replica_id, mem::take(&mut buffer.operations)); mutation_count -= 1; } - 71..=100 if network.lock().has_unreceived(replica_id) => { - let ops = network.lock().receive(replica_id); + 71..=100 if network.has_unreceived(replica_id) => { + let ops = network.receive(replica_id); if !ops.is_empty() { log::info!( "peer {} applying {} ops from the network.", @@ -3321,7 +3424,7 @@ mod tests { _ => {} }); - if mutation_count == 0 && network.lock().is_idle() { + if mutation_count == 0 && network.is_idle() { break; } } diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index 746af63bf70754d59c6e94644fb42f8b63ef75a8..1e6265e9c701119c64bd2f2258bd35d98ab66082 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -4,7 +4,7 @@ mod ignore; use self::{char_bag::CharBag, ignore::IgnoreStack}; use crate::{ - editor::{Buffer, History, Rope}, + editor::{Buffer, History, Operation, Rope}, language::LanguageRegistry, rpc::{self, proto}, sum_tree::{self, Cursor, Edit, SumTree}, @@ -46,6 +46,7 @@ lazy_static! { pub fn init(cx: &mut MutableAppContext, rpc: rpc::Client) { rpc.on_message(remote::open_buffer, cx); rpc.on_message(remote::close_buffer, cx); + rpc.on_message(remote::update_buffer, cx); } #[derive(Clone, Debug)] @@ -194,7 +195,7 @@ pub struct LocalWorktree { scan_state: (watch::Sender, watch::Receiver), _event_stream_handle: fsevent::Handle, poll_scheduled: bool, - rpc: Option, + rpc: Option<(rpc::Client, u64)>, open_buffers: HashMap>, } @@ -298,22 +299,7 @@ impl LocalWorktree { let language = language_registry.select_language(&path).cloned(); let file = File::new(handle, path.into()); let buffer = cx.add_model(|cx| { - let mut buffer = Buffer::from_history( - 0, - History::new(contents.into()), - Some(file), - language, - cx, - ); - buffer.on_operation({ - let worktree = handle.clone(); - move |operation, cx| { - worktree.update(cx, |tree, cx| { - // tree.buffer_changed(cx.model_id(), operation) - }); - } - }); - buffer + Buffer::from_history(0, History::new(contents.into()), Some(file), language, cx) }); this.update(&mut cx, |this, _| { let this = this.as_local_mut().unwrap(); @@ -500,14 +486,13 @@ impl LocalWorktree { pub fn share( &mut self, - client: rpc::Client, + rpc: rpc::Client, cx: &mut ModelContext, ) -> Task> { - self.rpc = Some(client.clone()); let root_name = self.root_name.clone(); let snapshot = self.snapshot(); let handle = cx.handle(); - cx.spawn(|_this, cx| async move { + cx.spawn(|this, mut cx| async move { let entries = cx .background() .spawn(async move { @@ -526,20 +511,23 @@ impl LocalWorktree { }) .await; - let share_response = client + let share_response = rpc .request(proto::ShareWorktree { worktree: Some(proto::Worktree { root_name, entries }), }) .await?; - client - .state + rpc.state .lock() .await .shared_worktrees .insert(share_response.worktree_id, handle); log::info!("sharing worktree {:?}", share_response); + + this.update(&mut cx, |worktree, _| { + worktree.as_local_mut().unwrap().rpc = Some((rpc, share_response.worktree_id)); + }); Ok((share_response.worktree_id, share_response.access_token)) }) } @@ -664,8 +652,7 @@ impl RemoteWorktree { let remote_buffer = response.buffer.ok_or_else(|| anyhow!("empty buffer"))?; let buffer_id = remote_buffer.id; let buffer = cx.add_model(|cx| { - Buffer::from_proto(replica_id, remote_buffer, Some(file), rpc, language, cx) - .unwrap() + Buffer::from_proto(replica_id, remote_buffer, Some(file), language, cx).unwrap() }); this.update(&mut cx, |this, _| { let this = this.as_remote_mut().unwrap(); @@ -936,7 +923,7 @@ impl File { Self { worktree, path } } - pub fn saved_buffer(&self, buffer: ModelHandle, cx: &mut MutableAppContext) { + pub fn buffer_added(&self, buffer: ModelHandle, cx: &mut MutableAppContext) { self.worktree.update(cx, |worktree, _| { if let Worktree::Local(worktree) = worktree { worktree @@ -946,6 +933,58 @@ impl File { }) } + pub fn buffer_updated( + &self, + buffer: ModelHandle, + operation: Operation, + cx: &mut MutableAppContext, + ) { + let buffer_id = buffer.read(cx).remote_id(); + self.worktree.update(cx, |worktree, cx| { + if let Some((rpc, remote_id)) = match worktree { + Worktree::Local(worktree) => worktree.rpc.clone(), + Worktree::Remote(worktree) => Some((worktree.rpc.clone(), worktree.remote_id)), + } { + cx.background() + .spawn(async move { + if let Err(error) = rpc + .send(proto::UpdateBuffer { + worktree_id: remote_id, + buffer_id, + operations: Some(operation).iter().map(Into::into).collect(), + }) + .await + { + log::error!("error sending buffer operation: {}", error); + } + }) + .detach(); + } + }); + } + + pub fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) { + self.worktree.update(cx, |worktree, cx| { + if let Worktree::Remote(worktree) = worktree { + let worktree_id = worktree.remote_id; + let rpc = worktree.rpc.clone(); + cx.background() + .spawn(async move { + if let Err(error) = rpc + .send(proto::CloseBuffer { + worktree_id, + buffer_id, + }) + .await + { + log::error!("error closing remote buffer: {}", error); + }; + }) + .detach(); + } + }); + } + /// Returns this file's path relative to the root of its worktree. pub fn path(&self) -> Arc { self.path.clone() @@ -1751,16 +1790,18 @@ impl<'a> Iterator for ChildEntriesIter<'a> { } mod remote { + use std::convert::TryInto; + use super::*; use crate::rpc::TypedEnvelope; pub async fn open_buffer( - request: TypedEnvelope, + envelope: TypedEnvelope, rpc: &rpc::Client, cx: &mut AsyncAppContext, ) -> anyhow::Result<()> { - let message = &request.payload; - let peer_id = request + let message = &envelope.payload; + let peer_id = envelope .original_sender_id .ok_or_else(|| anyhow!("missing original sender id"))?; @@ -1787,7 +1828,7 @@ mod remote { .insert(buffer.id() as u64, buffer.clone()); rpc.respond( - request.receipt(), + envelope.receipt(), proto::OpenBufferResponse { buffer: Some(buffer.update(cx, |buf, cx| buf.to_proto(cx))), }, @@ -1798,20 +1839,53 @@ mod remote { } pub async fn close_buffer( - message: TypedEnvelope, + envelope: TypedEnvelope, rpc: &rpc::Client, _: &mut AsyncAppContext, ) -> anyhow::Result<()> { - let peer_id = message + let peer_id = envelope .original_sender_id .ok_or_else(|| anyhow!("missing original sender id"))?; - let message = &message.payload; + let message = &envelope.payload; let mut state = rpc.state.lock().await; state.shared_buffers.entry(peer_id).and_modify(|buffers| { buffers.remove(&message.buffer_id); }); Ok(()) } + + pub async fn update_buffer( + envelope: TypedEnvelope, + rpc: &rpc::Client, + cx: &mut AsyncAppContext, + ) -> anyhow::Result<()> { + let peer_id = envelope + .original_sender_id + .ok_or_else(|| anyhow!("missing original sender id"))?; + let message = envelope.payload; + if let Some(buffer) = rpc + .state + .lock() + .await + .shared_buffers + .get(&peer_id) + .and_then(|buffers| buffers.get(&message.buffer_id)) + .cloned() + { + if let Err(error) = buffer.update(cx, |buffer, cx| { + let ops = message + .operations + .into_iter() + .map(|op| op.try_into()) + .collect::>>()?; + buffer.apply_ops(ops, cx)?; + Ok::<(), anyhow::Error>(()) + }) { + log::error!("error applying buffer operations {}", error); + } + } + Ok(()) + } } #[cfg(test)]