WIP: Start on `apply_remote_edit`

Antonio Scandurra and Nathan Sobo created

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

zed/src/editor/buffer.rs | 314 ++++++++++++++++++-----------------------
1 file changed, 142 insertions(+), 172 deletions(-)

Detailed changes

zed/src/editor/buffer.rs 🔗

@@ -338,6 +338,7 @@ pub struct Insertion {
 #[derive(Eq, PartialEq, Clone, Debug)]
 struct Fragment {
     insertion_id: time::Local,
+    lamport_timestamp: time::Lamport,
     len: usize,
     deletions: HashSet<time::Local>,
     max_undos: time::Global,
@@ -466,6 +467,7 @@ impl Buffer {
             fragments.push(
                 Fragment {
                     insertion_id: Default::default(),
+                    lamport_timestamp: Default::default(),
                     len: visible_text.len(),
                     deletions: Default::default(),
                     max_undos: Default::default(),
@@ -933,7 +935,7 @@ impl Buffer {
 
         let edit_id = self.local_clock.tick();
         let lamport_timestamp = self.lamport_clock.tick();
-        let edit = self.splice_fragments(&ranges, new_text, edit_id, lamport_timestamp);
+        let edit = self.apply_local_edit(&ranges, new_text, edit_id, lamport_timestamp);
 
         self.history.push(edit.clone());
         self.history.push_undo(edit.id);
@@ -1071,13 +1073,13 @@ impl Buffer {
                 ..
             } => {
                 if !self.version.observed(edit.id) {
-                    self.apply_edit(
+                    self.apply_remote_edit(
                         &edit.version,
                         &edit.ranges,
                         edit.new_text.as_deref(),
                         edit.id,
                         lamport_timestamp,
-                    )?;
+                    );
                     self.version.observe(edit.id);
                     self.history.push(edit);
                 }
@@ -1109,171 +1111,137 @@ impl Buffer {
         Ok(())
     }
 
-    fn apply_edit(
+    fn apply_remote_edit(
         &mut self,
         version: &time::Global,
         ranges: &[Range<usize>],
         new_text: Option<&str>,
         local_timestamp: time::Local,
         lamport_timestamp: time::Lamport,
-    ) -> Result<()> {
-        let old_visible_text = mem::take(&mut self.visible_text);
-        let old_deleted_text = mem::take(&mut self.deleted_text);
-        let old_fragments = mem::take(&mut self.fragments);
-        let mut old_fragments = old_fragments.cursor::<VersionedOffset, VersionedOffset>();
-        let old_fragments_cx = Some(version.clone());
-
-        let mut new_fragments = SumTree::new();
-        let mut new_ropes =
-            RopeBuilder::new(old_visible_text.cursor(0), old_deleted_text.cursor(0));
-        let mut ranges = ranges.iter().peekable();
-        let mut fragment_start_offset = 0;
-
-        // Push the fragments that precede the first edit and park the cursor over the fragment
-        // containing the start of the first edit.
-        if let Some(first_range) = ranges.peek() {
-            let prefix_fragments = old_fragments.slice(
-                &VersionedOffset::Offset(first_range.start),
-                SeekBias::Right,
-                &old_fragments_cx,
-            );
-            new_ropes.push_tree(prefix_fragments.summary().text);
-            new_fragments.push_tree(prefix_fragments, &None);
-            fragment_start_offset = old_fragments.start().offset();
+    ) {
+        if ranges.is_empty() {
+            return;
         }
 
-        while let Some(range) = ranges.peek() {
-            let fragment = old_fragments.item();
-            let fragment_end_offset = old_fragments.end(&old_fragments_cx).offset();
+        let cx = Some(version.clone());
+        let mut new_ropes =
+            RopeBuilder::new(self.visible_text.cursor(0), self.deleted_text.cursor(0));
+        let mut old_fragments = self.fragments.cursor::<VersionedOffset, VersionedOffset>();
+        let mut new_fragments = old_fragments.slice(
+            &VersionedOffset::Offset(ranges[0].start),
+            SeekBias::Left,
+            &cx,
+        );
+        new_ropes.push_tree(new_fragments.summary().text);
 
-            if let Some(fragment) = fragment {
-                // Was this fragment visible in the edit's base version? If not, push it into
-                // the new fragments, skip it, and continue the loop.
-                if !version.observed(fragment.insertion_id) {
-                    new_ropes.push_fragment(fragment, fragment.visible);
-                    new_fragments.push(fragment.clone(), &None);
-                    old_fragments.next(&old_fragments_cx);
-                    continue;
+        let mut fragment_start = old_fragments.start().offset();
+        for range in ranges {
+            if range.start > old_fragments.end(&cx).offset() {
+                if old_fragments.end(&cx).offset() > fragment_start {
+                    let mut suffix = old_fragments.item().unwrap().clone();
+                    suffix.len = old_fragments.end(&cx).offset() - fragment_start;
+                    new_ropes.push_fragment(&suffix, suffix.visible);
+                    new_fragments.push(suffix, &None);
+                    old_fragments.next(&cx);
                 }
 
-                // If the current fragment doesn't intersect the current range, push the remainder
-                // of the fragment and then slice to the fragment containing the start of the
-                // current range.
-                if range.start > fragment_end_offset {
-                    if fragment_end_offset > fragment_start_offset {
-                        let suffix = Fragment {
-                            len: fragment_end_offset - fragment_start_offset,
-                            deletions: fragment.deletions.clone(),
-                            max_undos: fragment.max_undos.clone(),
-                            visible: fragment.visible,
-                            insertion_id: fragment.insertion_id,
-                        };
-                        new_ropes.push_fragment(&suffix, fragment.visible);
-                        new_fragments.push(suffix, &None);
-                    }
+                let slice =
+                    old_fragments.slice(&VersionedOffset::Offset(range.start), SeekBias::Left, &cx);
+                new_ropes.push_tree(slice.summary().text);
+                new_fragments.push_tree(slice, &None);
+                fragment_start = old_fragments.start().offset();
+            }
 
-                    let prefix_fragments = old_fragments.slice(
-                        &VersionedOffset::Offset(range.start),
-                        SeekBias::Right,
-                        &old_fragments_cx,
-                    );
-                    new_ropes.push_tree(prefix_fragments.summary().text);
-                    new_fragments.push_tree(prefix_fragments, &None);
-                    fragment_start_offset = old_fragments.start().offset();
+            // If we are at the end of a non-concurrent fragment, advance to the next one.
+            if let Some(fragment) = old_fragments.item() {
+                let fragment_end = old_fragments.end(&cx).offset();
+                if range.start == fragment_end && fragment_end > fragment_start {
+                    let mut fragment = fragment.clone();
+                    fragment.len = fragment_end - fragment_start;
+                    new_ropes.push_fragment(&fragment, fragment.visible);
+                    new_fragments.push(fragment, &None);
+                    old_fragments.next(&cx);
+                    fragment_start = old_fragments.start().offset();
                 }
+            }
 
-                // Now the current range intersects the current fragment.
-                // If there is a piece of the fragment preceding the current range, consume it.
-                if range.start > fragment_start_offset {
-                    let prefix = Fragment {
-                        len: range.start - fragment_start_offset,
-                        deletions: fragment.deletions.clone(),
-                        max_undos: fragment.max_undos.clone(),
-                        visible: fragment.visible,
-                        insertion_id: fragment.insertion_id,
-                    };
-                    fragment_start_offset += prefix.len;
-                    new_ropes.push_fragment(&prefix, fragment.visible);
-                    new_fragments.push(prefix, &None);
+            // Skip over insertions that are concurrent to this edit, but have a lower lamport
+            // timestamp.
+            while let Some(fragment) = old_fragments.item() {
+                if range.start == fragment_start && fragment.lamport_timestamp > lamport_timestamp {
+                    new_ropes.push_fragment(fragment, fragment.visible);
+                    new_fragments.push(fragment.clone(), &None);
+                    old_fragments.next(&cx);
+                    debug_assert_eq!(fragment_start, range.start);
+                } else {
+                    break;
                 }
+            }
+            debug_assert!(fragment_start <= range.start);
+
+            if range.start > fragment_start {
+                let mut prefix = old_fragments.item().unwrap().clone();
+                prefix.len = range.start - fragment_start;
+                fragment_start = range.start;
+                new_ropes.push_fragment(&prefix, prefix.visible);
+                new_fragments.push(prefix, &None);
+            }
 
-                // Push the portion of the current fragment that intersects the current range,
-                // marking it as deleted.
-                if range.end > range.start {
-                    let deleted_end = cmp::min(range.end, fragment_end_offset);
-
-                    let mut deletions = fragment.deletions.clone();
-                    deletions.insert(local_timestamp);
-
-                    let deleted = Fragment {
-                        len: deleted_end - fragment_start_offset,
-                        deletions,
-                        max_undos: fragment.max_undos.clone(),
-                        visible: false,
-                        insertion_id: fragment.insertion_id,
-                    };
-                    fragment_start_offset += deleted.len;
-                    new_ropes.push_fragment(&deleted, fragment.visible);
-                    new_fragments.push(deleted, &None);
-                }
+            if let Some(new_text) = new_text {
+                new_ropes.push_str(new_text);
+                new_fragments.push(
+                    Fragment {
+                        insertion_id: local_timestamp,
+                        lamport_timestamp,
+                        len: new_text.len(),
+                        deletions: Default::default(),
+                        max_undos: Default::default(),
+                        visible: true,
+                    },
+                    &None,
+                );
+            }
 
-                // Push any new text
-                if let Some(new_next) = new_text {
-                    new_ropes.push_str(new_next);
-                    new_fragments.push(
-                        Fragment {
-                            len: new_next.len(),
-                            deletions: Default::default(),
-                            max_undos: Default::default(), // TODO: Is this right?
-                            visible: true,
-                            insertion_id: local_timestamp,
-                        },
-                        &None,
-                    );
+            while range.end > fragment_start {
+                let fragment = old_fragments.item().unwrap();
+                let fragment_end = old_fragments.end(&cx).offset();
+                let mut intersection = fragment.clone();
+                if intersection.was_visible(&version, &self.undo_map) {
+                    let intersection_end = cmp::min(range.end, fragment_end);
+                    intersection.len = intersection_end - fragment_start;
+                    intersection.deletions.insert(local_timestamp);
+                    intersection.visible = false;
+                    fragment_start = intersection_end;
                 }
+                new_ropes.push_fragment(&intersection, fragment.visible);
+                new_fragments.push(intersection, &None);
 
-                // Which ends first? The current fragment or the current range? If the current range
-                // ends before the current fragment, advance to the next range and preserve the
-                // current fragment. Otherwise, advance to next fragment and preserve the current
-                // range.
-                if range.end < fragment_end_offset {
-                    ranges.next();
-                } else {
-                    old_fragments.next(&old_fragments_cx);
-                    fragment_start_offset = fragment_end_offset;
+                if range.end >= fragment_end {
+                    old_fragments.next(&cx);
                 }
-            } else {
-                // Push a fragment containing the new text
             }
         }
 
-        if let Some(fragment) = old_fragments.item() {
-            let fragment_end_offset = old_fragments.end(&old_fragments_cx).offset();
-            if fragment_end_offset > fragment_start_offset {
-                let suffix = Fragment {
-                    len: fragment_end_offset - fragment_start_offset,
-                    deletions: fragment.deletions.clone(),
-                    max_undos: fragment.max_undos.clone(),
-                    visible: fragment.visible,
-                    insertion_id: fragment.insertion_id,
-                };
-                new_ropes.push_fragment(&suffix, fragment.visible);
-                new_fragments.push(suffix, &None);
-            }
-
-            let suffix_fragments = old_fragments.suffix(&old_fragments_cx);
-            new_ropes.push_tree(suffix_fragments.summary().text);
-            new_fragments.push_tree(suffix_fragments, &None);
+        let fragment_end = old_fragments.end(&cx).offset();
+        if fragment_end > fragment_start {
+            let mut suffix = old_fragments.item().unwrap().clone();
+            suffix.len = fragment_end - fragment_start;
+            new_ropes.push_fragment(&suffix, suffix.visible);
+            new_fragments.push(suffix, &None);
+            old_fragments.next(&cx);
         }
 
+        let suffix = old_fragments.suffix(&cx);
+        new_ropes.push_tree(suffix.summary().text);
+        new_fragments.push_tree(suffix, &None);
         let (visible_text, deleted_text) = new_ropes.finish();
+        drop(old_fragments);
 
         self.fragments = new_fragments;
         self.visible_text = visible_text;
         self.deleted_text = deleted_text;
         self.local_clock.observe(local_timestamp);
         self.lamport_clock.observe(lamport_timestamp);
-        Ok(())
     }
 
     pub fn undo(&mut self, mut cx: Option<&mut ModelContext<Self>>) -> Vec<Operation> {
@@ -1429,39 +1397,34 @@ impl Buffer {
     }
 
     fn can_apply_op(&self, op: &Operation) -> bool {
-        true
-        // if self.deferred_replicas.contains(&op.replica_id()) {
-        //     false
-        // } else {
-        //     match op {
-        //         Operation::Edit { edit, .. } => {
-        //             self.version.observed(edit.start_id)
-        //                 && self.version.observed(edit.end_id)
-        //                 && edit.version_in_range <= self.version
-        //         }
-        //         Operation::Undo { undo, .. } => self.version.observed(undo.edit_id),
-        //         Operation::UpdateSelections { selections, .. } => {
-        //             if let Some(selections) = selections {
-        //                 selections.iter().all(|selection| {
-        //                     let contains_start = match &selection.start {
-        //                         Anchor::Middle { version, .. } => self.version >= *version,
-        //                         _ => true,
-        //                     };
-        //                     let contains_end = match &selection.end {
-        //                         Anchor::Middle { version, .. } => self.version >= *version,
-        //                         _ => true,
-        //                     };
-        //                     contains_start && contains_end
-        //                 })
-        //             } else {
-        //                 true
-        //             }
-        //         }
-        //     }
-        // }
-    }
-
-    fn splice_fragments(
+        if self.deferred_replicas.contains(&op.replica_id()) {
+            false
+        } else {
+            match op {
+                Operation::Edit { edit, .. } => self.version >= edit.version,
+                Operation::Undo { undo, .. } => self.version.observed(undo.edit_id),
+                Operation::UpdateSelections { selections, .. } => {
+                    if let Some(selections) = selections {
+                        selections.iter().all(|selection| {
+                            let contains_start = match &selection.start {
+                                Anchor::Middle { version, .. } => self.version >= *version,
+                                _ => true,
+                            };
+                            let contains_end = match &selection.end {
+                                Anchor::Middle { version, .. } => self.version >= *version,
+                                _ => true,
+                            };
+                            contains_start && contains_end
+                        })
+                    } else {
+                        true
+                    }
+                }
+            }
+        }
+    }
+
+    fn apply_local_edit(
         &mut self,
         old_ranges: &[Range<usize>],
         new_text: Option<String>,
@@ -1522,8 +1485,9 @@ impl Buffer {
 
                     if let Some(new_text) = new_text.clone() {
                         let new_fragment = Fragment {
-                            len: new_text.len(),
                             insertion_id: edit_id,
+                            lamport_timestamp,
+                            len: new_text.len(),
                             deletions: Default::default(),
                             max_undos: Default::default(),
                             visible: true,
@@ -1626,8 +1590,9 @@ impl Buffer {
 
             if let Some(new_text) = new_text {
                 let new_fragment = Fragment {
-                    len: new_text.len(),
                     insertion_id: edit_id,
+                    lamport_timestamp,
+                    len: new_text.len(),
                     deletions: Default::default(),
                     max_undos: Default::default(),
                     visible: true,
@@ -3192,7 +3157,12 @@ mod tests {
             let first_buffer = buffers[0].read(cx);
             for buffer in &buffers[1..] {
                 let buffer = buffer.read(cx);
-                assert_eq!(buffer.text(), first_buffer.text());
+                assert_eq!(
+                    buffer.text(),
+                    first_buffer.text(),
+                    "Replica {} text != Replica 0 text",
+                    buffer.replica_id
+                );
                 assert_eq!(
                     buffer.all_selections().collect::<HashMap<_, _>>(),
                     first_buffer.all_selections().collect::<HashMap<_, _>>()