Introduce `text::Buffer::subscribe`

Antonio Scandurra and Nathan Sobo created

Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

Cargo.lock                                |   3 
crates/editor/src/display_map/wrap_map.rs |  12 -
crates/language/src/buffer.rs             |  27 -----
crates/text/Cargo.toml                    |   3 
crates/text/src/patch.rs                  |  21 +--
crates/text/src/tests.rs                  |  36 +++++-
crates/text/src/text.rs                   | 130 ++++++++++++++++++++----
7 files changed, 159 insertions(+), 73 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -4849,8 +4849,11 @@ dependencies = [
  "arrayvec 0.7.1",
  "clock",
  "collections",
+ "ctor",
+ "env_logger",
  "gpui",
  "log",
+ "parking_lot",
  "rand 0.8.3",
  "smallvec",
  "sum_tree",

crates/editor/src/display_map/wrap_map.rs 🔗

@@ -204,12 +204,10 @@ impl WrapMap {
             }
             let new_rows = self.snapshot.transforms.summary().output.lines.row + 1;
             self.snapshot.interpolated = false;
-            self.edits_since_sync = self.edits_since_sync.compose(&unsafe {
-                Patch::new_unchecked(vec![Edit {
-                    old: 0..old_rows,
-                    new: 0..new_rows,
-                }])
-            });
+            self.edits_since_sync = self.edits_since_sync.compose(&Patch::new(vec![Edit {
+                old: 0..old_rows,
+                new: 0..new_rows,
+            }]));
         }
     }
 
@@ -559,7 +557,7 @@ impl Snapshot {
         }
 
         consolidate_wrap_edits(&mut wrap_edits);
-        unsafe { Patch::new_unchecked(wrap_edits) }
+        Patch::new(wrap_edits)
     }
 
     pub fn text_chunks(&self, wrap_row: u32) -> impl Iterator<Item = &str> {

crates/language/src/buffer.rs 🔗

@@ -1322,8 +1322,7 @@ impl Buffer {
         was_dirty: bool,
         cx: &mut ModelContext<Self>,
     ) {
-        let patch =
-            unsafe { Patch::new_unchecked(self.edits_since::<usize>(old_version).collect()) };
+        let patch = Patch::new(self.edits_since::<usize>(old_version).collect());
         if patch.is_empty() {
             return;
         }
@@ -1502,30 +1501,6 @@ impl Entity for Buffer {
     }
 }
 
-// TODO: Do we need to clone a buffer?
-impl Clone for Buffer {
-    fn clone(&self) -> Self {
-        Self {
-            text: self.text.clone(),
-            saved_version: self.saved_version.clone(),
-            saved_mtime: self.saved_mtime,
-            file: self.file.as_ref().map(|f| f.boxed_clone()),
-            language: self.language.clone(),
-            syntax_tree: Mutex::new(self.syntax_tree.lock().clone()),
-            parsing_in_background: false,
-            sync_parse_timeout: self.sync_parse_timeout,
-            parse_count: self.parse_count,
-            autoindent_requests: Default::default(),
-            pending_autoindent: Default::default(),
-            diagnostics: self.diagnostics.clone(),
-            diagnostics_update_count: self.diagnostics_update_count,
-            language_server: None,
-            #[cfg(test)]
-            operations: self.operations.clone(),
-        }
-    }
-}
-
 impl Deref for Buffer {
     type Target = TextBuffer;
 

crates/text/Cargo.toml 🔗

@@ -16,10 +16,13 @@ sum_tree = { path = "../sum_tree" }
 anyhow = "1.0.38"
 arrayvec = "0.7.1"
 log = "0.4"
+parking_lot = "0.11"
 rand = { version = "0.8.3", optional = true }
 smallvec = { version = "1.6", features = ["union"] }
 
 [dev-dependencies]
 collections = { path = "../collections", features = ["test-support"] }
 gpui = { path = "../gpui", features = ["test-support"] }
+ctor = "0.1"
+env_logger = "0.8"
 rand = "0.8.3"

crates/text/src/patch.rs 🔗

@@ -18,18 +18,17 @@ where
         + Default
         + PartialEq,
 {
-    pub unsafe fn new_unchecked(edits: Vec<Edit<T>>) -> Self {
-        Self(edits)
-    }
-
     pub fn new(edits: Vec<Edit<T>>) -> Self {
-        let mut last_edit: Option<&Edit<T>> = None;
-        for edit in &edits {
-            if let Some(last_edit) = last_edit {
-                assert!(edit.old.start > last_edit.old.end);
-                assert!(edit.new.start > last_edit.new.end);
+        #[cfg(debug_assertions)]
+        {
+            let mut last_edit: Option<&Edit<T>> = None;
+            for edit in &edits {
+                if let Some(last_edit) = last_edit {
+                    assert!(edit.old.start > last_edit.old.end);
+                    assert!(edit.new.start > last_edit.new.end);
+                }
+                last_edit = Some(edit);
             }
-            last_edit = Some(edit);
         }
         Self(edits)
     }
@@ -179,7 +178,7 @@ where
         self.0.is_empty()
     }
 
-    fn push(&mut self, edit: Edit<T>) {
+    pub fn push(&mut self, edit: Edit<T>) {
         if edit.is_empty() {
             return;
         }

crates/text/src/tests.rs 🔗

@@ -8,6 +8,13 @@ use std::{
     time::{Duration, Instant},
 };
 
+#[cfg(test)]
+#[ctor::ctor]
+fn init_logger() {
+    // std::env::set_var("RUST_LOG", "info");
+    env_logger::init();
+}
+
 #[test]
 fn test_edit() {
     let mut buffer = Buffer::new(0, 0, History::new("abc".into()));
@@ -72,30 +79,43 @@ fn test_random_edits(mut rng: StdRng) {
         );
 
         if rng.gen_bool(0.3) {
-            buffer_versions.push(buffer.clone());
+            buffer_versions.push((buffer.clone(), buffer.subscribe()));
         }
     }
 
-    for mut old_buffer in buffer_versions {
+    for (old_buffer, subscription) in buffer_versions {
         let edits = buffer
             .edits_since::<usize>(&old_buffer.version)
             .collect::<Vec<_>>();
 
         log::info!(
-            "mutating old buffer version {:?}, text: {:?}, edits since: {:?}",
+            "applying edits since version {:?} to old text: {:?}: {:?}",
             old_buffer.version(),
             old_buffer.text(),
             edits,
         );
 
+        let mut text = old_buffer.visible_text.clone();
         for edit in edits {
             let new_text: String = buffer.text_for_range(edit.new.clone()).collect();
-            old_buffer.edit(
-                Some(edit.new.start..edit.new.start + edit.old.len()),
-                new_text,
-            );
+            text.replace(edit.new.start..edit.new.start + edit.old.len(), &new_text);
+        }
+        assert_eq!(text.to_string(), buffer.text());
+
+        let subscription_edits = subscription.consume();
+        log::info!(
+            "applying subscription edits since version {:?} to old text: {:?}: {:?}",
+            old_buffer.version(),
+            old_buffer.text(),
+            subscription_edits,
+        );
+
+        let mut text = old_buffer.visible_text.clone();
+        for edit in subscription_edits.into_inner() {
+            let new_text: String = buffer.text_for_range(edit.new.clone()).collect();
+            text.replace(edit.new.start..edit.new.start + edit.old.len(), &new_text);
         }
-        assert_eq!(old_buffer.text(), buffer.text());
+        assert_eq!(text.to_string(), buffer.text());
     }
 }
 

crates/text/src/text.rs 🔗

@@ -15,6 +15,7 @@ use anyhow::{anyhow, Result};
 use clock::ReplicaId;
 use collections::{HashMap, HashSet};
 use operation_queue::OperationQueue;
+use parking_lot::Mutex;
 pub use patch::Patch;
 pub use point::*;
 pub use point_utf16::*;
@@ -28,13 +29,12 @@ use std::{
     iter::Iterator,
     ops::{self, Deref, Range, Sub},
     str,
-    sync::Arc,
+    sync::{Arc, Weak},
     time::{Duration, Instant},
 };
 pub use sum_tree::Bias;
 use sum_tree::{FilterCursor, SumTree};
 
-#[derive(Clone)]
 pub struct Buffer {
     snapshot: Snapshot,
     last_edit: clock::Local,
@@ -46,6 +46,7 @@ pub struct Buffer {
     remote_id: u64,
     local_clock: clock::Local,
     lamport_clock: clock::Lamport,
+    subscriptions: Vec<Weak<Subscription>>,
 }
 
 #[derive(Clone)]
@@ -341,6 +342,25 @@ impl<D1, D2> Edit<(D1, D2)> {
     }
 }
 
+#[derive(Default)]
+pub struct Subscription(Arc<Mutex<Vec<Patch<usize>>>>);
+
+impl Subscription {
+    pub fn consume(&self) -> Patch<usize> {
+        let mut patches = self.0.lock();
+        let mut changes = Patch::default();
+        for patch in patches.drain(..) {
+            changes = changes.compose(&patch);
+        }
+        changes
+    }
+
+    pub fn publish(&self, patch: Patch<usize>) {
+        let mut changes = self.0.lock();
+        changes.push(patch);
+    }
+}
+
 #[derive(Copy, Clone, Debug, Default, Eq, PartialEq)]
 pub struct InsertionTimestamp {
     pub replica_id: ReplicaId,
@@ -473,13 +493,14 @@ impl Buffer {
             },
             last_edit: clock::Local::default(),
             history,
-            selections: HashMap::default(),
+            selections: Default::default(),
             deferred_ops: OperationQueue::new(),
             deferred_replicas: HashSet::default(),
             replica_id,
             remote_id,
             local_clock,
             lamport_clock,
+            subscriptions: Default::default(),
         }
     }
 
@@ -546,7 +567,8 @@ impl Buffer {
         new_text: Option<String>,
         timestamp: InsertionTimestamp,
     ) -> EditOperation {
-        let mut edit = EditOperation {
+        let mut edits = Patch::default();
+        let mut edit_op = EditOperation {
             timestamp,
             version: self.version(),
             ranges: Vec::with_capacity(ranges.len()),
@@ -602,6 +624,11 @@ impl Buffer {
 
             // Insert the new text before any existing fragments within the range.
             if let Some(new_text) = new_text.as_deref() {
+                let new_start = new_fragments.summary().text.visible;
+                edits.push(Edit {
+                    old: fragment_start..fragment_start,
+                    new: new_start..new_start + new_text.len(),
+                });
                 new_ropes.push_str(new_text);
                 new_fragments.push(
                     Fragment {
@@ -628,6 +655,13 @@ impl Buffer {
                     intersection.visible = false;
                 }
                 if intersection.len > 0 {
+                    if fragment.visible && !intersection.visible {
+                        let new_start = new_fragments.summary().text.visible;
+                        edits.push(Edit {
+                            old: fragment_start..intersection_end,
+                            new: new_start..new_start,
+                        });
+                    }
                     new_ropes.push_fragment(&intersection, fragment.visible);
                     new_fragments.push(intersection, &None);
                     fragment_start = intersection_end;
@@ -638,7 +672,7 @@ impl Buffer {
             }
 
             let full_range_end = FullOffset(range.end + old_fragments.start().deleted);
-            edit.ranges.push(full_range_start..full_range_end);
+            edit_op.ranges.push(full_range_start..full_range_end);
         }
 
         // If the current fragment has been partially consumed, then consume the rest of it
@@ -663,8 +697,9 @@ impl Buffer {
         self.snapshot.fragments = new_fragments;
         self.snapshot.visible_text = visible_text;
         self.snapshot.deleted_text = deleted_text;
-        edit.new_text = new_text;
-        edit
+        self.update_subscriptions(edits);
+        edit_op.new_text = new_text;
+        edit_op
     }
 
     pub fn apply_ops<I: IntoIterator<Item = Operation>>(&mut self, ops: I) -> Result<()> {
@@ -764,10 +799,11 @@ impl Buffer {
             return;
         }
 
+        let mut edits = Patch::default();
         let cx = Some(version.clone());
         let mut new_ropes =
             RopeBuilder::new(self.visible_text.cursor(0), self.deleted_text.cursor(0));
-        let mut old_fragments = self.fragments.cursor::<VersionedFullOffset>();
+        let mut old_fragments = self.fragments.cursor::<(VersionedFullOffset, usize)>();
         let mut new_fragments = old_fragments.slice(
             &VersionedFullOffset::Offset(ranges[0].start),
             Bias::Left,
@@ -775,16 +811,16 @@ impl Buffer {
         );
         new_ropes.push_tree(new_fragments.summary().text);
 
-        let mut fragment_start = old_fragments.start().full_offset();
+        let mut fragment_start = old_fragments.start().0.full_offset();
         for range in ranges {
-            let fragment_end = old_fragments.end(&cx).full_offset();
+            let fragment_end = old_fragments.end(&cx).0.full_offset();
 
             // If the current fragment ends before this range, then jump ahead to the first fragment
             // that extends past the start of this range, reusing any intervening fragments.
             if fragment_end < range.start {
                 // If the current fragment has been partially consumed, then consume the rest of it
                 // and advance to the next fragment before slicing.
-                if fragment_start > old_fragments.start().full_offset() {
+                if fragment_start > old_fragments.start().0.full_offset() {
                     if fragment_end > fragment_start {
                         let mut suffix = old_fragments.item().unwrap().clone();
                         suffix.len = fragment_end.0 - fragment_start.0;
@@ -798,18 +834,18 @@ impl Buffer {
                     old_fragments.slice(&VersionedFullOffset::Offset(range.start), Bias::Left, &cx);
                 new_ropes.push_tree(slice.summary().text);
                 new_fragments.push_tree(slice, &None);
-                fragment_start = old_fragments.start().full_offset();
+                fragment_start = old_fragments.start().0.full_offset();
             }
 
             // If we are at the end of a non-concurrent fragment, advance to the next one.
-            let fragment_end = old_fragments.end(&cx).full_offset();
+            let fragment_end = old_fragments.end(&cx).0.full_offset();
             if fragment_end == range.start && fragment_end > fragment_start {
                 let mut fragment = old_fragments.item().unwrap().clone();
                 fragment.len = fragment_end.0 - fragment_start.0;
                 new_ropes.push_fragment(&fragment, fragment.visible);
                 new_fragments.push(fragment, &None);
                 old_fragments.next(&cx);
-                fragment_start = old_fragments.start().full_offset();
+                fragment_start = old_fragments.start().0.full_offset();
             }
 
             // Skip over insertions that are concurrent to this edit, but have a lower lamport
@@ -839,6 +875,15 @@ impl Buffer {
 
             // Insert the new text before any existing fragments within the range.
             if let Some(new_text) = new_text {
+                let mut old_start = old_fragments.start().1;
+                if old_fragments.item().map_or(false, |f| f.visible) {
+                    old_start += fragment_start.0 - old_fragments.start().0.full_offset().0;
+                }
+                let new_start = new_fragments.summary().text.visible;
+                edits.push(Edit {
+                    old: old_start..old_start,
+                    new: new_start..new_start + new_text.len(),
+                });
                 new_ropes.push_str(new_text);
                 new_fragments.push(
                     Fragment {
@@ -856,7 +901,7 @@ impl Buffer {
             // portions as deleted.
             while fragment_start < range.end {
                 let fragment = old_fragments.item().unwrap();
-                let fragment_end = old_fragments.end(&cx).full_offset();
+                let fragment_end = old_fragments.end(&cx).0.full_offset();
                 let mut intersection = fragment.clone();
                 let intersection_end = cmp::min(range.end, fragment_end);
                 if fragment.was_visible(version, &self.undo_map) {
@@ -865,6 +910,15 @@ impl Buffer {
                     intersection.visible = false;
                 }
                 if intersection.len > 0 {
+                    if fragment.visible && !intersection.visible {
+                        let old_start = old_fragments.start().1
+                            + (fragment_start.0 - old_fragments.start().0.full_offset().0);
+                        let new_start = new_fragments.summary().text.visible;
+                        edits.push(Edit {
+                            old: old_start..old_start + intersection.len,
+                            new: new_start..new_start,
+                        });
+                    }
                     new_ropes.push_fragment(&intersection, fragment.visible);
                     new_fragments.push(intersection, &None);
                     fragment_start = intersection_end;
@@ -877,8 +931,8 @@ impl Buffer {
 
         // If the current fragment has been partially consumed, then consume the rest of it
         // and advance to the next fragment before slicing.
-        if fragment_start > old_fragments.start().full_offset() {
-            let fragment_end = old_fragments.end(&cx).full_offset();
+        if fragment_start > old_fragments.start().0.full_offset() {
+            let fragment_end = old_fragments.end(&cx).0.full_offset();
             if fragment_end > fragment_start {
                 let mut suffix = old_fragments.item().unwrap().clone();
                 suffix.len = fragment_end.0 - fragment_start.0;
@@ -899,9 +953,11 @@ impl Buffer {
         self.snapshot.deleted_text = deleted_text;
         self.local_clock.observe(timestamp.local());
         self.lamport_clock.observe(timestamp.lamport());
+        self.update_subscriptions(edits);
     }
 
     fn apply_undo(&mut self, undo: &UndoOperation) -> Result<()> {
+        let mut edits = Patch::default();
         self.snapshot.undo_map.insert(undo);
 
         let mut cx = undo.version.clone();
@@ -910,7 +966,7 @@ impl Buffer {
         }
         let cx = Some(cx);
 
-        let mut old_fragments = self.fragments.cursor::<VersionedFullOffset>();
+        let mut old_fragments = self.fragments.cursor::<(VersionedFullOffset, usize)>();
         let mut new_fragments = old_fragments.slice(
             &VersionedFullOffset::Offset(undo.ranges[0].start),
             Bias::Right,
@@ -921,7 +977,7 @@ impl Buffer {
         new_ropes.push_tree(new_fragments.summary().text);
 
         for range in &undo.ranges {
-            let mut end_offset = old_fragments.end(&cx).full_offset();
+            let mut end_offset = old_fragments.end(&cx).0.full_offset();
 
             if end_offset < range.start {
                 let preceding_fragments = old_fragments.slice(
@@ -944,11 +1000,25 @@ impl Buffer {
                         fragment.visible = fragment.is_visible(&self.undo_map);
                         fragment.max_undos.observe(undo.id);
                     }
+
+                    let old_start = old_fragments.start().1;
+                    let new_start = new_fragments.summary().text.visible;
+                    if fragment_was_visible && !fragment.visible {
+                        edits.push(Edit {
+                            old: old_start..old_start + fragment.len,
+                            new: new_start..new_start,
+                        });
+                    } else if !fragment_was_visible && fragment.visible {
+                        edits.push(Edit {
+                            old: old_start..old_start,
+                            new: new_start..new_start + fragment.len,
+                        });
+                    }
                     new_ropes.push_fragment(&fragment, fragment_was_visible);
                     new_fragments.push(fragment, &None);
 
                     old_fragments.next(&cx);
-                    if end_offset == old_fragments.end(&cx).full_offset() {
+                    if end_offset == old_fragments.end(&cx).0.full_offset() {
                         let unseen_fragments = old_fragments.slice(
                             &VersionedFullOffset::Offset(end_offset),
                             Bias::Right,
@@ -957,7 +1027,7 @@ impl Buffer {
                         new_ropes.push_tree(unseen_fragments.summary().text);
                         new_fragments.push_tree(unseen_fragments, &None);
                     }
-                    end_offset = old_fragments.end(&cx).full_offset();
+                    end_offset = old_fragments.end(&cx).0.full_offset();
                 } else {
                     break;
                 }
@@ -973,6 +1043,7 @@ impl Buffer {
         self.snapshot.fragments = new_fragments;
         self.snapshot.visible_text = visible_text;
         self.snapshot.deleted_text = deleted_text;
+        self.update_subscriptions(edits);
         Ok(())
     }
 
@@ -1129,6 +1200,23 @@ impl Buffer {
         })
     }
 
+    pub fn subscribe(&mut self) -> Arc<Subscription> {
+        let subscription = Arc::new(Default::default());
+        self.subscriptions.push(Arc::downgrade(&subscription));
+        subscription
+    }
+
+    fn update_subscriptions(&mut self, edits: Patch<usize>) {
+        self.subscriptions.retain(|subscription| {
+            if let Some(subscription) = subscription.upgrade() {
+                subscription.publish(edits.clone());
+                true
+            } else {
+                false
+            }
+        });
+    }
+
     pub fn selection_set(&self, set_id: SelectionSetId) -> Result<&SelectionSet> {
         self.selections
             .get(&set_id)