Simplify buffer_operations schema

Max Brunsfeld and Mikayla created

Co-authored-by: Mikayla <mikayla@zed.dev>

Change summary

crates/collab/migrations.sqlite/20221109000000_test_schema.sql  |   3 
crates/collab/migrations/20230819154600_add_channel_buffers.sql |   3 
crates/collab/src/db/queries/buffers.rs                         | 278 +-
crates/collab/src/db/tables/buffer_operation.rs                 |   3 
4 files changed, 121 insertions(+), 166 deletions(-)

Detailed changes

crates/collab/migrations.sqlite/20221109000000_test_schema.sql 🔗

@@ -222,9 +222,6 @@ CREATE TABLE "buffer_operations" (
     "epoch" INTEGER NOT NULL,
     "replica_id" INTEGER NOT NULL,
     "lamport_timestamp" INTEGER NOT NULL,
-    "local_timestamp" INTEGER NOT NULL,
-    "version" BLOB NOT NULL,
-    "is_undo" BOOLEAN NOT NULL,
     "value" BLOB NOT NULL,
     PRIMARY KEY(buffer_id, epoch, lamport_timestamp, replica_id)
 );

crates/collab/migrations/20230819154600_add_channel_buffers.sql 🔗

@@ -10,10 +10,7 @@ CREATE TABLE "buffer_operations" (
     "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE,
     "epoch" INTEGER NOT NULL,
     "replica_id" INTEGER NOT NULL,
-    "local_timestamp" INTEGER NOT NULL,
     "lamport_timestamp" INTEGER NOT NULL,
-    "version" BYTEA NOT NULL,
-    "is_undo" BOOLEAN NOT NULL,
     "value" BYTEA NOT NULL,
     PRIMARY KEY(buffer_id, epoch, lamport_timestamp, replica_id)
 );

crates/collab/src/db/queries/buffers.rs 🔗

@@ -1,6 +1,5 @@
 use super::*;
 use prost::Message;
-use std::ops::Range;
 use text::{EditOperation, InsertionTimestamp, UndoOperation};
 
 impl Database {
@@ -234,6 +233,7 @@ impl Database {
             let serialization_version: i32 = buffer
                 .find_related(buffer_snapshot::Entity)
                 .select_only()
+                .column(buffer_snapshot::Column::OperationSerializationVersion)
                 .filter(buffer_snapshot::Column::Epoch.eq(buffer.epoch))
                 .into_values::<_, QueryVersion>()
                 .one(&*tx)
@@ -326,11 +326,7 @@ impl Database {
         let mut text_buffer = text::Buffer::new(0, 0, base_text);
 
         text_buffer
-            .apply_ops(
-                operations
-                    .into_iter()
-                    .filter_map(deserialize_wire_operation),
-            )
+            .apply_ops(operations.into_iter().filter_map(operation_from_wire))
             .unwrap();
 
         let base_text = text_buffer.text();
@@ -363,71 +359,122 @@ fn operation_to_storage(
     buffer: &buffer::Model,
     _format: i32,
 ) -> Option<buffer_operation::ActiveModel> {
-    match operation.variant.as_ref()? {
-        proto::operation::Variant::Edit(operation) => {
-            let value = edit_operation_to_storage(&operation.ranges, &operation.new_text);
-            let version = version_to_storage(&operation.version);
-            Some(buffer_operation::ActiveModel {
-                buffer_id: ActiveValue::Set(buffer.id),
-                epoch: ActiveValue::Set(buffer.epoch),
-                replica_id: ActiveValue::Set(operation.replica_id as i32),
-                lamport_timestamp: ActiveValue::Set(operation.lamport_timestamp as i32),
-                local_timestamp: ActiveValue::Set(operation.local_timestamp as i32),
-                is_undo: ActiveValue::Set(false),
-                version: ActiveValue::Set(version),
-                value: ActiveValue::Set(value),
-            })
-        }
-        proto::operation::Variant::Undo(operation) => {
-            let value = undo_operation_to_storage(&operation.counts);
-            let version = version_to_storage(&operation.version);
-            Some(buffer_operation::ActiveModel {
-                buffer_id: ActiveValue::Set(buffer.id),
-                epoch: ActiveValue::Set(buffer.epoch),
-                replica_id: ActiveValue::Set(operation.replica_id as i32),
-                lamport_timestamp: ActiveValue::Set(operation.lamport_timestamp as i32),
-                local_timestamp: ActiveValue::Set(operation.local_timestamp as i32),
-                is_undo: ActiveValue::Set(true),
-                version: ActiveValue::Set(version),
-                value: ActiveValue::Set(value),
-            })
-        }
-        proto::operation::Variant::UpdateSelections(_) => None,
-        proto::operation::Variant::UpdateDiagnostics(_) => None,
-        proto::operation::Variant::UpdateCompletionTriggers(_) => None,
-    }
+    let (replica_id, lamport_timestamp, value) = match operation.variant.as_ref()? {
+        proto::operation::Variant::Edit(operation) => (
+            operation.replica_id,
+            operation.lamport_timestamp,
+            storage::Operation {
+                local_timestamp: operation.local_timestamp,
+                version: version_to_storage(&operation.version),
+                is_undo: false,
+                edit_ranges: operation
+                    .ranges
+                    .iter()
+                    .map(|range| storage::Range {
+                        start: range.start,
+                        end: range.end,
+                    })
+                    .collect(),
+                edit_texts: operation.new_text.clone(),
+                undo_counts: Vec::new(),
+            },
+        ),
+        proto::operation::Variant::Undo(operation) => (
+            operation.replica_id,
+            operation.lamport_timestamp,
+            storage::Operation {
+                local_timestamp: operation.local_timestamp,
+                version: version_to_storage(&operation.version),
+                is_undo: true,
+                edit_ranges: Vec::new(),
+                edit_texts: Vec::new(),
+                undo_counts: operation
+                    .counts
+                    .iter()
+                    .map(|entry| storage::UndoCount {
+                        replica_id: entry.replica_id,
+                        local_timestamp: entry.local_timestamp,
+                        count: entry.count,
+                    })
+                    .collect(),
+            },
+        ),
+        _ => None?,
+    };
+
+    Some(buffer_operation::ActiveModel {
+        buffer_id: ActiveValue::Set(buffer.id),
+        epoch: ActiveValue::Set(buffer.epoch),
+        replica_id: ActiveValue::Set(replica_id as i32),
+        lamport_timestamp: ActiveValue::Set(lamport_timestamp as i32),
+        value: ActiveValue::Set(value.encode_to_vec()),
+    })
 }
 
 fn operation_from_storage(
     row: buffer_operation::Model,
     _format_version: i32,
 ) -> Result<proto::operation::Variant, Error> {
-    let version = version_from_storage(&row.version)?;
-    let operation = if row.is_undo {
-        let counts = undo_operation_from_storage(&row.value)?;
+    let operation =
+        storage::Operation::decode(row.value.as_slice()).map_err(|error| anyhow!("{}", error))?;
+    let version = version_from_storage(&operation.version);
+    Ok(if operation.is_undo {
         proto::operation::Variant::Undo(proto::operation::Undo {
             replica_id: row.replica_id as u32,
-            local_timestamp: row.local_timestamp as u32,
+            local_timestamp: operation.local_timestamp as u32,
             lamport_timestamp: row.lamport_timestamp as u32,
             version,
-            counts,
+            counts: operation
+                .undo_counts
+                .iter()
+                .map(|entry| proto::UndoCount {
+                    replica_id: entry.replica_id,
+                    local_timestamp: entry.local_timestamp,
+                    count: entry.count,
+                })
+                .collect(),
         })
     } else {
-        let (ranges, new_text) = edit_operation_from_storage(&row.value)?;
         proto::operation::Variant::Edit(proto::operation::Edit {
             replica_id: row.replica_id as u32,
-            local_timestamp: row.local_timestamp as u32,
+            local_timestamp: operation.local_timestamp as u32,
             lamport_timestamp: row.lamport_timestamp as u32,
             version,
-            ranges,
-            new_text,
+            ranges: operation
+                .edit_ranges
+                .into_iter()
+                .map(|range| proto::Range {
+                    start: range.start,
+                    end: range.end,
+                })
+                .collect(),
+            new_text: operation.edit_texts,
         })
-    };
-    Ok(operation)
+    })
+}
+
+fn version_to_storage(version: &Vec<proto::VectorClockEntry>) -> Vec<storage::VectorClockEntry> {
+    version
+        .iter()
+        .map(|entry| storage::VectorClockEntry {
+            replica_id: entry.replica_id,
+            timestamp: entry.timestamp,
+        })
+        .collect()
+}
+
+fn version_from_storage(version: &Vec<storage::VectorClockEntry>) -> Vec<proto::VectorClockEntry> {
+    version
+        .iter()
+        .map(|entry| proto::VectorClockEntry {
+            replica_id: entry.replica_id,
+            timestamp: entry.timestamp,
+        })
+        .collect()
 }
 
 // This is currently a manual copy of the deserialization code in the client's langauge crate
-pub fn deserialize_wire_operation(operation: proto::Operation) -> Option<text::Operation> {
+pub fn operation_from_wire(operation: proto::Operation) -> Option<text::Operation> {
     match operation.variant? {
         proto::operation::Variant::Edit(edit) => Some(text::Operation::Edit(EditOperation {
             timestamp: InsertionTimestamp {
@@ -435,8 +482,14 @@ pub fn deserialize_wire_operation(operation: proto::Operation) -> Option<text::O
                 local: edit.local_timestamp,
                 lamport: edit.lamport_timestamp,
             },
-            version: deserialize_wire_version(&edit.version),
-            ranges: edit.ranges.into_iter().map(deserialize_range).collect(),
+            version: version_from_wire(&edit.version),
+            ranges: edit
+                .ranges
+                .into_iter()
+                .map(|range| {
+                    text::FullOffset(range.start as usize)..text::FullOffset(range.end as usize)
+                })
+                .collect(),
             new_text: edit.new_text.into_iter().map(Arc::from).collect(),
         })),
         proto::operation::Variant::Undo(undo) => Some(text::Operation::Undo {
@@ -449,7 +502,7 @@ pub fn deserialize_wire_operation(operation: proto::Operation) -> Option<text::O
                     replica_id: undo.replica_id as text::ReplicaId,
                     value: undo.local_timestamp,
                 },
-                version: deserialize_wire_version(&undo.version),
+                version: version_from_wire(&undo.version),
                 counts: undo
                     .counts
                     .into_iter()
@@ -469,11 +522,7 @@ pub fn deserialize_wire_operation(operation: proto::Operation) -> Option<text::O
     }
 }
 
-pub fn deserialize_range(range: proto::Range) -> Range<text::FullOffset> {
-    text::FullOffset(range.start as usize)..text::FullOffset(range.end as usize)
-}
-
-fn deserialize_wire_version(message: &[proto::VectorClockEntry]) -> clock::Global {
+fn version_from_wire(message: &[proto::VectorClockEntry]) -> clock::Global {
     let mut version = clock::Global::new();
     for entry in message {
         version.observe(clock::Local {
@@ -486,15 +535,23 @@ fn deserialize_wire_version(message: &[proto::VectorClockEntry]) -> clock::Globa
 
 mod storage {
     #![allow(non_snake_case)]
-
     use prost::Message;
-
     pub const SERIALIZATION_VERSION: i32 = 1;
 
     #[derive(Message)]
-    pub struct VectorClock {
-        #[prost(message, repeated, tag = "1")]
-        pub entries: Vec<VectorClockEntry>,
+    pub struct Operation {
+        #[prost(uint32, tag = "1")]
+        pub local_timestamp: u32,
+        #[prost(message, repeated, tag = "2")]
+        pub version: Vec<VectorClockEntry>,
+        #[prost(bool, tag = "3")]
+        pub is_undo: bool,
+        #[prost(message, repeated, tag = "4")]
+        pub edit_ranges: Vec<Range>,
+        #[prost(string, repeated, tag = "5")]
+        pub edit_texts: Vec<String>,
+        #[prost(message, repeated, tag = "6")]
+        pub undo_counts: Vec<UndoCount>,
     }
 
     #[derive(Message)]
@@ -505,14 +562,6 @@ mod storage {
         pub timestamp: u32,
     }
 
-    #[derive(Message)]
-    pub struct TextEdit {
-        #[prost(message, repeated, tag = "1")]
-        pub ranges: Vec<Range>,
-        #[prost(string, repeated, tag = "2")]
-        pub texts: Vec<String>,
-    }
-
     #[derive(Message)]
     pub struct Range {
         #[prost(uint64, tag = "1")]
@@ -521,12 +570,6 @@ mod storage {
         pub end: u64,
     }
 
-    #[derive(Message)]
-    pub struct Undo {
-        #[prost(message, repeated, tag = "1")]
-        pub entries: Vec<UndoCount>,
-    }
-
     #[derive(Message)]
     pub struct UndoCount {
         #[prost(uint32, tag = "1")]
@@ -537,82 +580,3 @@ mod storage {
         pub count: u32,
     }
 }
-
-fn version_to_storage(version: &Vec<proto::VectorClockEntry>) -> Vec<u8> {
-    storage::VectorClock {
-        entries: version
-            .iter()
-            .map(|entry| storage::VectorClockEntry {
-                replica_id: entry.replica_id,
-                timestamp: entry.timestamp,
-            })
-            .collect(),
-    }
-    .encode_to_vec()
-}
-
-fn version_from_storage(bytes: &[u8]) -> Result<Vec<proto::VectorClockEntry>> {
-    let clock = storage::VectorClock::decode(bytes).map_err(|error| anyhow!("{}", error))?;
-    Ok(clock
-        .entries
-        .into_iter()
-        .map(|entry| proto::VectorClockEntry {
-            replica_id: entry.replica_id,
-            timestamp: entry.timestamp,
-        })
-        .collect())
-}
-
-fn edit_operation_to_storage(ranges: &[proto::Range], texts: &[String]) -> Vec<u8> {
-    storage::TextEdit {
-        ranges: ranges
-            .iter()
-            .map(|range| storage::Range {
-                start: range.start,
-                end: range.end,
-            })
-            .collect(),
-        texts: texts.to_vec(),
-    }
-    .encode_to_vec()
-}
-
-fn edit_operation_from_storage(bytes: &[u8]) -> Result<(Vec<proto::Range>, Vec<String>)> {
-    let edit = storage::TextEdit::decode(bytes).map_err(|error| anyhow!("{}", error))?;
-    let ranges = edit
-        .ranges
-        .into_iter()
-        .map(|range| proto::Range {
-            start: range.start,
-            end: range.end,
-        })
-        .collect();
-    Ok((ranges, edit.texts))
-}
-
-fn undo_operation_to_storage(counts: &Vec<proto::UndoCount>) -> Vec<u8> {
-    storage::Undo {
-        entries: counts
-            .iter()
-            .map(|entry| storage::UndoCount {
-                replica_id: entry.replica_id,
-                local_timestamp: entry.local_timestamp,
-                count: entry.count,
-            })
-            .collect(),
-    }
-    .encode_to_vec()
-}
-
-fn undo_operation_from_storage(bytes: &[u8]) -> Result<Vec<proto::UndoCount>> {
-    let undo = storage::Undo::decode(bytes).map_err(|error| anyhow!("{}", error))?;
-    Ok(undo
-        .entries
-        .iter()
-        .map(|entry| proto::UndoCount {
-            replica_id: entry.replica_id,
-            local_timestamp: entry.local_timestamp,
-            count: entry.count,
-        })
-        .collect())
-}

crates/collab/src/db/tables/buffer_operation.rs 🔗

@@ -12,9 +12,6 @@ pub struct Model {
     pub lamport_timestamp: i32,
     #[sea_orm(primary_key)]
     pub replica_id: i32,
-    pub local_timestamp: i32,
-    pub version: Vec<u8>,
-    pub is_undo: bool,
     pub value: Vec<u8>,
 }