From 624eb5907e4997ae17a4437dc532a0d161e5fa16 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 8 Feb 2022 14:50:47 +0100 Subject: [PATCH] Serialize buffer in terms of operations rather than state This is required because, after joining, we want to be able to refer to operations that have happened prior to joining, which are not captured by the state. There is probably a way of reconstructing operations from the state, but that seems unnecessary and we've already talked about wanting to have the server store operations rather than state once we start persisting worktrees. --- crates/language/src/buffer.rs | 97 ++++++++++----------------- crates/language/src/proto.rs | 57 +++++++++++++--- crates/language/src/tests.rs | 8 +-- crates/rpc/proto/zed.proto | 19 +++--- crates/text/src/tests.rs | 12 ++-- crates/text/src/text.rs | 123 +++++++++++++--------------------- 6 files changed, 146 insertions(+), 170 deletions(-) diff --git a/crates/language/src/buffer.rs b/crates/language/src/buffer.rs index dc7a50151b39ba487a1cf14d04fc58fb5143f5d3..127b6831f463636856d79d3e04bacf89198312c0 100644 --- a/crates/language/src/buffer.rs +++ b/crates/language/src/buffer.rs @@ -147,12 +147,12 @@ pub enum Operation { lamport_timestamp: clock::Lamport, }, UpdateSelections { - replica_id: ReplicaId, selections: Arc<[Selection]>, lamport_timestamp: clock::Lamport, }, UpdateCompletionTriggers { triggers: Vec, + lamport_timestamp: clock::Lamport, }, } @@ -447,27 +447,19 @@ impl Buffer { file: Option>, cx: &mut ModelContext, ) -> Result { - let fragments_len = message.fragments.len(); - let buffer = TextBuffer::from_parts( + let buffer = TextBuffer::new( replica_id, message.id, - &message.visible_text, - &message.deleted_text, - message - .undo_map - .into_iter() - .map(proto::deserialize_undo_map_entry), - message - .fragments - .into_iter() - .enumerate() - .map(|(i, fragment)| { - proto::deserialize_buffer_fragment(fragment, i, fragments_len) - }), - message.lamport_timestamp, - From::from(message.version), + History::new(Arc::from(message.base_text)), ); let mut this = Self::build(buffer, file); + let ops = message + .operations + .into_iter() + .map(proto::deserialize_operation) + .collect::>>()?; + this.apply_ops(ops, cx)?; + for selection_set in message.selections { this.remote_selections.insert( selection_set.replica_id as ReplicaId, @@ -486,37 +478,24 @@ impl Buffer { DiagnosticSet::from_sorted_entries(entries.into_iter().cloned(), &snapshot), cx, ); - this.completion_triggers = message.completion_triggers; - let deferred_ops = message - .deferred_operations - .into_iter() - .map(proto::deserialize_operation) - .collect::>>()?; - this.apply_ops(deferred_ops, cx)?; - Ok(this) } pub fn to_proto(&self) -> proto::BufferState { + let mut operations = self + .text + .history() + .map(|op| proto::serialize_operation(&Operation::Buffer(op.clone()))) + .chain(self.deferred_ops.iter().map(proto::serialize_operation)) + .collect::>(); + operations.sort_unstable_by_key(proto::lamport_timestamp_for_operation); proto::BufferState { id: self.remote_id(), file: self.file.as_ref().map(|f| f.to_proto()), - visible_text: self.text.text(), - deleted_text: self.text.deleted_text(), - undo_map: self - .text - .undo_history() - .map(proto::serialize_undo_map_entry) - .collect(), - version: From::from(&self.version), - lamport_timestamp: self.lamport_clock.value, - fragments: self - .text - .fragments() - .map(proto::serialize_buffer_fragment) - .collect(), + base_text: self.base_text().to_string(), + operations, selections: self .remote_selections .iter() @@ -527,16 +506,6 @@ impl Buffer { }) .collect(), diagnostics: proto::serialize_diagnostics(self.diagnostics.iter()), - deferred_operations: self - .deferred_ops - .iter() - .map(proto::serialize_operation) - .chain( - self.text - .deferred_ops() - .map(|op| proto::serialize_operation(&Operation::Buffer(op.clone()))), - ) - .collect(), completion_triggers: self.completion_triggers.clone(), } } @@ -708,9 +677,13 @@ impl Buffer { .and_then(|c| c.trigger_characters) .unwrap_or_default(); this.update(&mut cx, |this, cx| { + let lamport_timestamp = this.text.lamport_clock.tick(); this.completion_triggers = triggers.clone(); this.send_operation( - Operation::UpdateCompletionTriggers { triggers }, + Operation::UpdateCompletionTriggers { + triggers, + lamport_timestamp, + }, cx, ); cx.notify(); @@ -1404,7 +1377,6 @@ impl Buffer { ); self.send_operation( Operation::UpdateSelections { - replica_id: self.text.replica_id(), selections, lamport_timestamp, }, @@ -1526,7 +1498,7 @@ impl Buffer { let new_text_len = new_text.len(); let edit = self.text.edit(ranges.iter().cloned(), new_text); - let edit_id = edit.timestamp.local(); + let edit_id = edit.local_timestamp(); if let Some((before_edit, edited)) = autoindent_request { let mut inserted = None; @@ -1555,7 +1527,7 @@ impl Buffer { } self.end_transaction(cx); - self.send_operation(Operation::Buffer(text::Operation::Edit(edit)), cx); + self.send_operation(Operation::Buffer(edit), cx); Some(edit_id) } @@ -1702,18 +1674,17 @@ impl Buffer { ); } Operation::UpdateSelections { - replica_id, selections, lamport_timestamp, } => { - if let Some(set) = self.remote_selections.get(&replica_id) { + if let Some(set) = self.remote_selections.get(&lamport_timestamp.replica_id) { if set.lamport_timestamp > lamport_timestamp { return; } } self.remote_selections.insert( - replica_id, + lamport_timestamp.replica_id, SelectionSet { selections, lamport_timestamp, @@ -1722,8 +1693,12 @@ impl Buffer { self.text.lamport_clock.observe(lamport_timestamp); self.selections_update_count += 1; } - Operation::UpdateCompletionTriggers { triggers } => { + Operation::UpdateCompletionTriggers { + triggers, + lamport_timestamp, + } => { self.completion_triggers = triggers; + self.text.lamport_clock.observe(lamport_timestamp); } } } @@ -2812,10 +2787,10 @@ impl operation_queue::Operation for Operation { } | Operation::UpdateSelections { lamport_timestamp, .. - } => *lamport_timestamp, - Operation::UpdateCompletionTriggers { .. } => { - unreachable!("updating completion triggers should never be deferred") } + | Operation::UpdateCompletionTriggers { + lamport_timestamp, .. + } => *lamport_timestamp, } } } diff --git a/crates/language/src/proto.rs b/crates/language/src/proto.rs index 2f23a1242e80923e39aeff93204775ce89ef9a72..62691583fb5f1e8b1dec4bcd8e348ecef7c7941e 100644 --- a/crates/language/src/proto.rs +++ b/crates/language/src/proto.rs @@ -45,11 +45,10 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation { version: From::from(&undo.version), }), Operation::UpdateSelections { - replica_id, selections, lamport_timestamp, } => proto::operation::Variant::UpdateSelections(proto::operation::UpdateSelections { - replica_id: *replica_id as u32, + replica_id: lamport_timestamp.replica_id as u32, lamport_timestamp: lamport_timestamp.value, selections: serialize_selections(selections), }), @@ -61,13 +60,16 @@ pub fn serialize_operation(operation: &Operation) -> proto::Operation { lamport_timestamp: lamport_timestamp.value, diagnostics: serialize_diagnostics(diagnostics.iter()), }), - Operation::UpdateCompletionTriggers { triggers } => { - proto::operation::Variant::UpdateCompletionTriggers( - proto::operation::UpdateCompletionTriggers { - triggers: triggers.clone(), - }, - ) - } + Operation::UpdateCompletionTriggers { + triggers, + lamport_timestamp, + } => proto::operation::Variant::UpdateCompletionTriggers( + proto::operation::UpdateCompletionTriggers { + replica_id: lamport_timestamp.replica_id as u32, + lamport_timestamp: lamport_timestamp.value, + triggers: triggers.clone(), + }, + ), }), } } @@ -233,7 +235,6 @@ pub fn deserialize_operation(message: proto::Operation) -> Result { .collect::>(); Operation::UpdateSelections { - replica_id: message.replica_id as ReplicaId, lamport_timestamp: clock::Lamport { replica_id: message.replica_id as ReplicaId, value: message.lamport_timestamp, @@ -251,6 +252,10 @@ pub fn deserialize_operation(message: proto::Operation) -> Result { proto::operation::Variant::UpdateCompletionTriggers(message) => { Operation::UpdateCompletionTriggers { triggers: message.triggers, + lamport_timestamp: clock::Lamport { + replica_id: message.replica_id as ReplicaId, + value: message.lamport_timestamp, + }, } } }, @@ -381,6 +386,38 @@ pub fn deserialize_anchor(anchor: proto::Anchor) -> Option { }) } +pub fn lamport_timestamp_for_operation(operation: &proto::Operation) -> Option { + let replica_id; + let value; + match operation.variant.as_ref()? { + proto::operation::Variant::Edit(op) => { + replica_id = op.replica_id; + value = op.lamport_timestamp; + } + proto::operation::Variant::Undo(op) => { + replica_id = op.replica_id; + value = op.lamport_timestamp; + } + proto::operation::Variant::UpdateDiagnostics(op) => { + replica_id = op.replica_id; + value = op.lamport_timestamp; + } + proto::operation::Variant::UpdateSelections(op) => { + replica_id = op.replica_id; + value = op.lamport_timestamp; + } + proto::operation::Variant::UpdateCompletionTriggers(op) => { + replica_id = op.replica_id; + value = op.lamport_timestamp; + } + } + + Some(clock::Lamport { + replica_id: replica_id as ReplicaId, + value, + }) +} + pub fn serialize_completion(completion: &Completion) -> proto::Completion { proto::Completion { old_start: Some(serialize_anchor(&completion.old_range.start)), diff --git a/crates/language/src/tests.rs b/crates/language/src/tests.rs index b852680bd622e2e9220120feedf88494499f4e74..0e436b495b3af306b22b4999fa3d3656f5ab59be 100644 --- a/crates/language/src/tests.rs +++ b/crates/language/src/tests.rs @@ -1072,15 +1072,15 @@ fn test_random_collaboration(cx: &mut MutableAppContext, mut rng: StdRng) { for buffer in &buffers { let buffer = buffer.read(cx).snapshot(); + let actual_remote_selections = buffer + .remote_selections_in_range(Anchor::min()..Anchor::max()) + .map(|(replica_id, selections)| (replica_id, selections.collect::>())) + .collect::>(); let expected_remote_selections = active_selections .iter() .filter(|(replica_id, _)| **replica_id != buffer.replica_id()) .map(|(replica_id, selections)| (*replica_id, selections.iter().collect::>())) .collect::>(); - let actual_remote_selections = buffer - .remote_selections_in_range(Anchor::min()..Anchor::max()) - .map(|(replica_id, selections)| (replica_id, selections.collect::>())) - .collect::>(); assert_eq!(actual_remote_selections, expected_remote_selections); } } diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index f61aaf38e61a48c022d43f1bfcfce65dbbd22ec7..5e92c7b22fd8a34ae2e727c71afdcc15a2fc04ca 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -406,16 +406,11 @@ message Buffer { message BufferState { uint64 id = 1; optional File file = 2; - string visible_text = 3; - string deleted_text = 4; - repeated BufferFragment fragments = 5; - repeated UndoMapEntry undo_map = 6; - repeated VectorClockEntry version = 7; - repeated SelectionSet selections = 8; - repeated Diagnostic diagnostics = 9; - uint32 lamport_timestamp = 10; - repeated Operation deferred_operations = 11; - repeated string completion_triggers = 12; + string base_text = 3; + repeated Operation operations = 4; + repeated SelectionSet selections = 5; + repeated Diagnostic diagnostics = 6; + repeated string completion_triggers = 7; } message BufferFragment { @@ -514,7 +509,9 @@ message Operation { } message UpdateCompletionTriggers { - repeated string triggers = 1; + uint32 replica_id = 1; + uint32 lamport_timestamp = 2; + repeated string triggers = 3; } } diff --git a/crates/text/src/tests.rs b/crates/text/src/tests.rs index b4ac7d46d4f79fa68ccc5fc4213fdf2a453c4b88..0c6c95cb4a4bf5392b51b8fd5261c9e4da429b3e 100644 --- a/crates/text/src/tests.rs +++ b/crates/text/src/tests.rs @@ -550,12 +550,12 @@ fn test_concurrent_edits() { let buf3_op = buffer3.edit(vec![5..6], "56"); assert_eq!(buffer3.text(), "abcde56"); - buffer1.apply_op(Operation::Edit(buf2_op.clone())).unwrap(); - buffer1.apply_op(Operation::Edit(buf3_op.clone())).unwrap(); - buffer2.apply_op(Operation::Edit(buf1_op.clone())).unwrap(); - buffer2.apply_op(Operation::Edit(buf3_op.clone())).unwrap(); - buffer3.apply_op(Operation::Edit(buf1_op.clone())).unwrap(); - buffer3.apply_op(Operation::Edit(buf2_op.clone())).unwrap(); + buffer1.apply_op(buf2_op.clone()).unwrap(); + buffer1.apply_op(buf3_op.clone()).unwrap(); + buffer2.apply_op(buf1_op.clone()).unwrap(); + buffer2.apply_op(buf3_op.clone()).unwrap(); + buffer3.apply_op(buf1_op.clone()).unwrap(); + buffer3.apply_op(buf2_op.clone()).unwrap(); assert_eq!(buffer1.text(), "a12c34e56"); assert_eq!(buffer2.text(), "a12c34e56"); diff --git a/crates/text/src/text.rs b/crates/text/src/text.rs index b1f513c3756d81ab30b65c8f7ada0721c5d205a1..37e5c72402ce937574d17a5057309d540bbe7620 100644 --- a/crates/text/src/text.rs +++ b/crates/text/src/text.rs @@ -130,7 +130,7 @@ impl Transaction { pub struct History { // TODO: Turn this into a String or Rope, maybe. pub base_text: Arc, - ops: HashMap, + operations: HashMap, undo_stack: Vec, redo_stack: Vec, transaction_depth: usize, @@ -142,7 +142,7 @@ impl History { pub fn new(base_text: Arc) -> Self { Self { base_text, - ops: Default::default(), + operations: Default::default(), undo_stack: Vec::new(), redo_stack: Vec::new(), transaction_depth: 0, @@ -151,8 +151,8 @@ impl History { } } - fn push(&mut self, op: EditOperation) { - self.ops.insert(op.timestamp.local(), op); + fn push(&mut self, op: Operation) { + self.operations.insert(op.local_timestamp(), op); } fn start_transaction(&mut self, start: clock::Global, now: Instant) -> Option { @@ -216,7 +216,7 @@ impl History { if let Some(last_transaction) = transactions_to_keep.last_mut() { for transaction in &*transactions_to_merge { for edit_id in &transaction.edits { - last_transaction.push_edit(&self.ops[edit_id]); + last_transaction.push_edit(self.operations[edit_id].as_edit().unwrap()); } } @@ -240,8 +240,11 @@ impl History { assert_eq!(self.transaction_depth, 0); let mut edit_ids = edit_ids.into_iter().peekable(); - if let Some(first_edit_id) = edit_ids.peek() { - let version = self.ops[first_edit_id].version.clone(); + if let Some(first_edit) = edit_ids + .peek() + .and_then(|e| self.operations.get(&e)?.as_edit()) + { + let version = first_edit.version.clone(); self.start_transaction(version, now); for edit_id in edit_ids { self.push_undo(edit_id); @@ -250,10 +253,12 @@ impl History { } } - fn push_undo(&mut self, edit_id: clock::Local) { + fn push_undo(&mut self, op_id: clock::Local) { assert_ne!(self.transaction_depth, 0); - let last_transaction = self.undo_stack.last_mut().unwrap(); - last_transaction.push_edit(&self.ops[&edit_id]); + if let Some(Operation::Edit(edit)) = self.operations.get(&op_id) { + let last_transaction = self.undo_stack.last_mut().unwrap(); + last_transaction.push_edit(&edit); + } } fn pop_undo(&mut self) -> Option<&Transaction> { @@ -545,56 +550,6 @@ impl Buffer { } } - pub fn from_parts( - replica_id: u16, - remote_id: u64, - visible_text: &str, - deleted_text: &str, - undo_map: impl Iterator)>, - fragments: impl ExactSizeIterator, - lamport_timestamp: u32, - version: clock::Global, - ) -> Self { - let visible_text = visible_text.into(); - let deleted_text = deleted_text.into(); - let fragments = SumTree::from_iter(fragments, &None); - let mut insertions = fragments - .iter() - .map(|fragment| InsertionFragment { - timestamp: fragment.insertion_timestamp.local(), - split_offset: fragment.insertion_offset, - fragment_id: fragment.id.clone(), - }) - .collect::>(); - insertions.sort_unstable_by_key(|i| (i.timestamp, i.split_offset)); - Self { - remote_id, - replica_id, - history: History::new("".into()), - deferred_ops: OperationQueue::new(), - deferred_replicas: Default::default(), - local_clock: clock::Local { - replica_id, - value: version.get(replica_id) + 1, - }, - lamport_clock: clock::Lamport { - replica_id, - value: lamport_timestamp, - }, - subscriptions: Default::default(), - edit_id_resolvers: Default::default(), - snapshot: BufferSnapshot { - replica_id, - visible_text, - deleted_text, - undo_map: UndoMap(undo_map.collect()), - fragments, - insertions: SumTree::from_iter(insertions, &()), - version, - }, - } - } - pub fn version(&self) -> clock::Global { self.version.clone() } @@ -619,7 +574,7 @@ impl Buffer { self.history.group_interval } - pub fn edit(&mut self, ranges: R, new_text: T) -> EditOperation + pub fn edit(&mut self, ranges: R, new_text: T) -> Operation where R: IntoIterator, I: ExactSizeIterator>, @@ -640,13 +595,14 @@ impl Buffer { local: self.local_clock.tick().value, lamport: self.lamport_clock.tick().value, }; - let edit = self.apply_local_edit(ranges.into_iter(), new_text, timestamp); + let operation = + Operation::Edit(self.apply_local_edit(ranges.into_iter(), new_text, timestamp)); - self.history.push(edit.clone()); - self.history.push_undo(edit.timestamp.local()); - self.snapshot.version.observe(edit.timestamp.local()); + self.history.push(operation.clone()); + self.history.push_undo(operation.local_timestamp()); + self.snapshot.version.observe(operation.local_timestamp()); self.end_transaction(); - edit + operation } fn apply_local_edit( @@ -814,6 +770,7 @@ impl Buffer { pub fn apply_ops>(&mut self, ops: I) -> Result<()> { let mut deferred_ops = Vec::new(); for op in ops { + self.history.push(op.clone()); if self.can_apply_op(&op) { self.apply_op(op)?; } else { @@ -838,7 +795,6 @@ impl Buffer { ); self.snapshot.version.observe(edit.timestamp.local()); self.resolve_edit(edit.timestamp.local()); - self.history.push(edit); } } Operation::Undo { @@ -1141,10 +1097,6 @@ impl Buffer { Ok(()) } - pub fn deferred_ops(&self) -> impl Iterator { - self.deferred_ops.iter() - } - fn flush_deferred_ops(&mut self) -> Result<()> { self.deferred_replicas.clear(); let mut deferred_ops = Vec::new(); @@ -1205,8 +1157,8 @@ impl Buffer { &self.history.base_text } - pub fn history(&self) -> impl Iterator { - self.history.ops.values() + pub fn history(&self) -> impl Iterator { + self.history.operations.values() } pub fn undo_history(&self) -> impl Iterator { @@ -1271,12 +1223,13 @@ impl Buffer { version: transaction.start.clone(), }; self.apply_undo(&undo)?; - self.snapshot.version.observe(undo.id); - - Ok(Operation::Undo { + let operation = Operation::Undo { undo, lamport_timestamp: self.lamport_clock.tick(), - }) + }; + self.snapshot.version.observe(operation.local_timestamp()); + self.history.push(operation.clone()); + Ok(operation) } pub fn push_transaction( @@ -1403,7 +1356,7 @@ impl Buffer { new_text ); let op = self.edit(old_ranges.iter().cloned(), new_text.as_str()); - (old_ranges, new_text, Operation::Edit(op)) + (old_ranges, new_text, op) } pub fn randomly_undo_redo(&mut self, rng: &mut impl rand::Rng) -> Vec { @@ -2181,6 +2134,20 @@ impl Operation { operation_queue::Operation::lamport_timestamp(self).replica_id } + pub fn local_timestamp(&self) -> clock::Local { + match self { + Operation::Edit(edit) => edit.timestamp.local(), + Operation::Undo { undo, .. } => undo.id, + } + } + + pub fn as_edit(&self) -> Option<&EditOperation> { + match self { + Operation::Edit(edit) => Some(edit), + _ => None, + } + } + pub fn is_edit(&self) -> bool { match self { Operation::Edit { .. } => true,