Wait for additional edits before pushing transaction in remote buffer

Antonio Scandurra created

Change summary

Cargo.lock                    |  1 
crates/language/src/buffer.rs |  2 +
crates/text/Cargo.toml        |  1 
crates/text/src/text.rs       | 38 ++++++++++++++++++++++++++++++++++++
4 files changed, 41 insertions(+), 1 deletion(-)

Detailed changes

Cargo.lock 🔗

@@ -4904,6 +4904,7 @@ dependencies = [
  "lazy_static",
  "log",
  "parking_lot",
+ "postage",
  "rand 0.8.3",
  "smallvec",
  "sum_tree",

crates/language/src/buffer.rs 🔗

@@ -1910,6 +1910,8 @@ impl Buffer {
             );
             cx.spawn(|this, mut cx| async move {
                 let edit_ids = apply_edits.await?;
+                this.update(&mut cx, |this, _| this.text.wait_for_edits(&edit_ids))
+                    .await;
                 if push_to_history {
                     this.update(&mut cx, |this, _| {
                         this.text

crates/text/Cargo.toml 🔗

@@ -18,6 +18,7 @@ arrayvec = "0.7.1"
 lazy_static = "1.4"
 log = "0.4"
 parking_lot = "0.11"
+postage = { version = "0.4.1", features = ["futures-traits"] }
 rand = { version = "0.8.3", optional = true }
 smallvec = { version = "1.6", features = ["union"] }
 

crates/text/src/text.rs 🔗

@@ -21,6 +21,7 @@ use operation_queue::OperationQueue;
 pub use patch::Patch;
 pub use point::*;
 pub use point_utf16::*;
+use postage::{oneshot, prelude::*};
 #[cfg(any(test, feature = "test-support"))]
 pub use random_char_iter::*;
 use rope::TextDimension;
@@ -28,6 +29,7 @@ pub use rope::{Chunks, Rope, TextSummary};
 pub use selection::*;
 use std::{
     cmp::{self, Ordering},
+    future::Future,
     iter::Iterator,
     ops::{self, Deref, Range, Sub},
     str,
@@ -50,6 +52,7 @@ pub struct Buffer {
     local_clock: clock::Local,
     pub lamport_clock: clock::Lamport,
     subscriptions: Topic,
+    edit_id_resolvers: HashMap<clock::Local, Vec<oneshot::Sender<()>>>,
 }
 
 #[derive(Clone, Debug)]
@@ -538,6 +541,7 @@ impl Buffer {
             local_clock,
             lamport_clock,
             subscriptions: Default::default(),
+            edit_id_resolvers: Default::default(),
         }
     }
 
@@ -579,6 +583,7 @@ impl Buffer {
                 value: lamport_timestamp,
             },
             subscriptions: Default::default(),
+            edit_id_resolvers: Default::default(),
             snapshot: BufferSnapshot {
                 replica_id,
                 visible_text,
@@ -833,6 +838,7 @@ impl Buffer {
                         edit.timestamp,
                     );
                     self.snapshot.version.observe(edit.timestamp.local());
+                    self.resolve_edit(edit.timestamp.local());
                     self.history.push(edit);
                 }
             }
@@ -1213,7 +1219,6 @@ impl Buffer {
 
     pub fn undo(&mut self) -> Option<(TransactionId, Operation)> {
         if let Some(transaction) = self.history.pop_undo().cloned() {
-            dbg!(&transaction);
             let transaction_id = transaction.id;
             let op = self.undo_or_redo(transaction).unwrap();
             Some((transaction_id, op))
@@ -1286,6 +1291,37 @@ impl Buffer {
     pub fn subscribe(&mut self) -> Subscription {
         self.subscriptions.subscribe()
     }
+
+    pub fn wait_for_edits(
+        &mut self,
+        edit_ids: &[clock::Local],
+    ) -> impl 'static + Future<Output = ()> {
+        let mut futures = Vec::new();
+        for edit_id in edit_ids {
+            if !self.version.observed(*edit_id) {
+                let (tx, rx) = oneshot::channel();
+                self.edit_id_resolvers.entry(*edit_id).or_default().push(tx);
+                futures.push(rx);
+            }
+        }
+
+        async move {
+            for mut future in futures {
+                future.recv().await;
+            }
+        }
+    }
+
+    fn resolve_edit(&mut self, edit_id: clock::Local) {
+        for mut tx in self
+            .edit_id_resolvers
+            .remove(&edit_id)
+            .into_iter()
+            .flatten()
+        {
+            let _ = tx.try_send(());
+        }
+    }
 }
 
 #[cfg(any(test, feature = "test-support"))]