Start work on storing channel buffers

Max Brunsfeld created

Change summary

Cargo.lock                                                      |   2 
Cargo.toml                                                      |   1 
crates/collab/Cargo.toml                                        |   2 
crates/collab/migrations.sqlite/20221109000000_test_schema.sql  |  27 
crates/collab/migrations/20230819154600_add_channel_buffers.sql |  25 
crates/collab/src/db.rs                                         |   2 
crates/collab/src/db/ids.rs                                     |   1 
crates/collab/src/db/queries.rs                                 |   4 
crates/collab/src/db/queries/buffer_tests.rs                    |  41 
crates/collab/src/db/queries/buffers.rs                         | 271 +++
crates/collab/src/db/tables.rs                                  |   3 
crates/collab/src/db/tables/buffer.rs                           |  32 
crates/collab/src/db/tables/buffer_operation.rs                 |  37 
crates/collab/src/db/tables/buffer_snapshot.rs                  |  30 
crates/collab/src/db/test_db.rs                                 |   8 
crates/rpc/Cargo.toml                                           |   2 
16 files changed, 484 insertions(+), 4 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1444,6 +1444,7 @@ dependencies = [
  "pretty_assertions",
  "project",
  "prometheus",
+ "prost 0.8.0",
  "rand 0.8.5",
  "reqwest",
  "rpc",
@@ -1456,6 +1457,7 @@ dependencies = [
  "settings",
  "sha-1 0.9.8",
  "sqlx",
+ "text",
  "theme",
  "time 0.3.24",
  "tokio",

Cargo.toml 🔗

@@ -96,6 +96,7 @@ log = { version = "0.4.16", features = ["kv_unstable_serde"] }
 ordered-float = { version = "2.1.1" }
 parking_lot = { version = "0.11.1" }
 postage = { version = "0.5", features = ["futures-traits"] }
+prost = { version = "0.8" }
 rand = { version = "0.8.5" }
 refineable = { path = "./crates/refineable" }
 regex = { version = "1.5" }

crates/collab/Cargo.toml 🔗

@@ -16,6 +16,7 @@ required-features = ["seed-support"]
 [dependencies]
 collections = { path = "../collections" }
 live_kit_server = { path = "../live_kit_server" }
+text = { path = "../text" }
 rpc = { path = "../rpc" }
 util = { path = "../util" }
 
@@ -35,6 +36,7 @@ log.workspace = true
 nanoid = "0.4"
 parking_lot.workspace = true
 prometheus = "0.13"
+prost.workspace = true
 rand.workspace = true
 reqwest = { version = "0.11", features = ["json"], optional = true }
 scrypt = "0.7"

crates/collab/migrations.sqlite/20221109000000_test_schema.sql 🔗

@@ -189,7 +189,8 @@ CREATE INDEX "index_followers_on_room_id" ON "followers" ("room_id");
 CREATE TABLE "channels" (
     "id" INTEGER PRIMARY KEY AUTOINCREMENT,
     "name" VARCHAR NOT NULL,
-    "created_at" TIMESTAMP NOT NULL DEFAULT now
+    "created_at" TIMESTAMP NOT NULL DEFAULT now,
+    "main_buffer_id" INTEGER REFERENCES buffers (id)
 );
 
 CREATE TABLE "channel_paths" (
@@ -208,3 +209,27 @@ CREATE TABLE "channel_members" (
 );
 
 CREATE UNIQUE INDEX "index_channel_members_on_channel_id_and_user_id" ON "channel_members" ("channel_id", "user_id");
+
+CREATE TABLE "buffers" (
+    "id" INTEGER PRIMARY KEY AUTOINCREMENT,
+    "epoch" INTEGER NOT NULL DEFAULT 0
+);
+
+CREATE TABLE "buffer_operations" (
+    "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE,
+    "epoch" INTEGER NOT NULL,
+    "replica_id" INTEGER NOT NULL,
+    "lamport_timestamp" INTEGER NOT NULL,
+    "local_timestamp" INTEGER NOT NULL,
+    "version" BLOB NOT NULL,
+    "is_undo" BOOLEAN NOT NULL,
+    "value" BLOB NOT NULL,
+    PRIMARY KEY(buffer_id, epoch, lamport_timestamp, replica_id)
+);
+
+CREATE TABLE "buffer_snapshots" (
+    "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE,
+    "epoch" INTEGER NOT NULL,
+    "text" TEXT NOT NULL,
+    PRIMARY KEY(buffer_id, epoch)
+);

crates/collab/migrations/20230819154600_add_channel_buffers.sql 🔗

@@ -0,0 +1,25 @@
+CREATE TABLE "buffers" (
+    "id" SERIAL PRIMARY KEY,
+    "epoch" INTEGER NOT NULL DEFAULT 0
+);
+
+CREATE TABLE "buffer_operations" (
+    "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE,
+    "epoch" INTEGER NOT NULL,
+    "replica_id" INTEGER NOT NULL,
+    "local_timestamp" INTEGER NOT NULL,
+    "lamport_timestamp" INTEGER NOT NULL,
+    "version" BYTEA NOT NULL,
+    "is_undo" BOOLEAN NOT NULL,
+    "value" BYTEA NOT NULL,
+    PRIMARY KEY(buffer_id, epoch, lamport_timestamp, replica_id)
+);
+
+CREATE TABLE "buffer_snapshots" (
+    "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE,
+    "epoch" INTEGER NOT NULL,
+    "text" TEXT NOT NULL,
+    PRIMARY KEY(buffer_id, epoch)
+);
+
+ALTER TABLE "channels" ADD COLUMN "main_buffer_id" INTEGER REFERENCES buffers (id);

crates/collab/src/db.rs 🔗

@@ -52,6 +52,8 @@ pub struct Database {
     runtime: Option<tokio::runtime::Runtime>,
 }
 
+// The `Database` type has so many methods that its impl blocks are split into
+// separate files in the `queries` folder.
 impl Database {
     pub async fn new(options: ConnectOptions, executor: Executor) -> Result<Self> {
         Ok(Self {

crates/collab/src/db/ids.rs 🔗

@@ -110,6 +110,7 @@ fn value_to_integer(v: Value) -> Result<i32, ValueTypeErr> {
     }
 }
 
+id_type!(BufferId);
 id_type!(AccessTokenId);
 id_type!(ChannelId);
 id_type!(ChannelMemberId);

crates/collab/src/db/queries.rs 🔗

@@ -1,6 +1,7 @@
 use super::*;
 
 pub mod access_tokens;
+pub mod buffers;
 pub mod channels;
 pub mod contacts;
 pub mod projects;
@@ -8,3 +9,6 @@ pub mod rooms;
 pub mod servers;
 pub mod signups;
 pub mod users;
+
+#[cfg(test)]
+pub mod buffer_tests;

crates/collab/src/db/queries/buffer_tests.rs 🔗

@@ -0,0 +1,41 @@
+use super::*;
+use crate::test_both_dbs;
+use language::proto;
+use text::Buffer;
+
+test_both_dbs!(test_buffers, test_buffers_postgres, test_buffers_sqlite);
+
+async fn test_buffers(db: &Arc<Database>) {
+    let buffer_id = db.create_buffer().await.unwrap();
+
+    let mut buffer = Buffer::new(0, 0, "".to_string());
+    let mut operations = Vec::new();
+    operations.push(buffer.edit([(0..0, "hello world")]));
+    operations.push(buffer.edit([(5..5, ", cruel")]));
+    operations.push(buffer.edit([(0..5, "goodbye")]));
+    operations.push(buffer.undo().unwrap().1);
+    assert_eq!(buffer.text(), "hello, cruel world");
+
+    let operations = operations
+        .into_iter()
+        .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
+        .collect::<Vec<_>>();
+
+    db.update_buffer(buffer_id, &operations).await.unwrap();
+
+    let buffer_data = db.get_buffer(buffer_id).await.unwrap();
+
+    let mut buffer_2 = Buffer::new(0, 0, buffer_data.base_text);
+    buffer_2
+        .apply_ops(buffer_data.operations.into_iter().map(|operation| {
+            let operation = proto::deserialize_operation(operation).unwrap();
+            if let language::Operation::Buffer(operation) = operation {
+                operation
+            } else {
+                unreachable!()
+            }
+        }))
+        .unwrap();
+
+    assert_eq!(buffer_2.text(), "hello, cruel world");
+}

crates/collab/src/db/queries/buffers.rs 🔗

@@ -0,0 +1,271 @@
+use super::*;
+use prost::Message;
+
+pub struct Buffer {
+    pub base_text: String,
+    pub operations: Vec<proto::Operation>,
+}
+
+impl Database {
+    pub async fn create_buffer(&self) -> Result<BufferId> {
+        self.transaction(|tx| async move {
+            let buffer = buffer::ActiveModel::new().insert(&*tx).await?;
+            Ok(buffer.id)
+        })
+        .await
+    }
+
+    pub async fn update_buffer(
+        &self,
+        buffer_id: BufferId,
+        operations: &[proto::Operation],
+    ) -> Result<()> {
+        self.transaction(|tx| async move {
+            let buffer = buffer::Entity::find_by_id(buffer_id)
+                .one(&*tx)
+                .await?
+                .ok_or_else(|| anyhow!("no such buffer"))?;
+            buffer_operation::Entity::insert_many(operations.iter().filter_map(|operation| {
+                match operation.variant.as_ref()? {
+                    proto::operation::Variant::Edit(operation) => {
+                        let value =
+                            serialize_edit_operation(&operation.ranges, &operation.new_text);
+                        let version = serialize_version(&operation.version);
+                        Some(buffer_operation::ActiveModel {
+                            buffer_id: ActiveValue::Set(buffer_id),
+                            epoch: ActiveValue::Set(buffer.epoch),
+                            replica_id: ActiveValue::Set(operation.replica_id as i32),
+                            lamport_timestamp: ActiveValue::Set(operation.lamport_timestamp as i32),
+                            local_timestamp: ActiveValue::Set(operation.local_timestamp as i32),
+                            is_undo: ActiveValue::Set(false),
+                            version: ActiveValue::Set(version),
+                            value: ActiveValue::Set(value),
+                        })
+                    }
+                    proto::operation::Variant::Undo(operation) => {
+                        let value = serialize_undo_operation(&operation.counts);
+                        let version = serialize_version(&operation.version);
+                        Some(buffer_operation::ActiveModel {
+                            buffer_id: ActiveValue::Set(buffer_id),
+                            epoch: ActiveValue::Set(buffer.epoch),
+                            replica_id: ActiveValue::Set(operation.replica_id as i32),
+                            lamport_timestamp: ActiveValue::Set(operation.lamport_timestamp as i32),
+                            local_timestamp: ActiveValue::Set(operation.local_timestamp as i32),
+                            is_undo: ActiveValue::Set(true),
+                            version: ActiveValue::Set(version),
+                            value: ActiveValue::Set(value),
+                        })
+                    }
+                    proto::operation::Variant::UpdateSelections(_) => None,
+                    proto::operation::Variant::UpdateDiagnostics(_) => None,
+                    proto::operation::Variant::UpdateCompletionTriggers(_) => None,
+                }
+            }))
+            .exec(&*tx)
+            .await?;
+
+            Ok(())
+        })
+        .await
+    }
+
+    pub async fn get_buffer(&self, id: BufferId) -> Result<Buffer> {
+        self.transaction(|tx| async move {
+            let buffer = buffer::Entity::find_by_id(id)
+                .one(&*tx)
+                .await?
+                .ok_or_else(|| anyhow!("no such buffer"))?;
+
+            let base_text = if buffer.epoch > 0 {
+                buffer_snapshot::Entity::find()
+                    .filter(
+                        buffer_snapshot::Column::BufferId
+                            .eq(id)
+                            .and(buffer_snapshot::Column::Epoch.eq(buffer.epoch)),
+                    )
+                    .one(&*tx)
+                    .await?
+                    .ok_or_else(|| anyhow!("no such snapshot"))?
+                    .text
+            } else {
+                String::new()
+            };
+
+            let mut rows = buffer_operation::Entity::find()
+                .filter(
+                    buffer_operation::Column::BufferId
+                        .eq(id)
+                        .and(buffer_operation::Column::Epoch.eq(buffer.epoch)),
+                )
+                .stream(&*tx)
+                .await?;
+            let mut operations = Vec::new();
+            while let Some(row) = rows.next().await {
+                let row = row?;
+                let version = deserialize_version(&row.version)?;
+                let operation = if row.is_undo {
+                    let counts = deserialize_undo_operation(&row.value)?;
+                    proto::operation::Variant::Undo(proto::operation::Undo {
+                        replica_id: row.replica_id as u32,
+                        local_timestamp: row.local_timestamp as u32,
+                        lamport_timestamp: row.lamport_timestamp as u32,
+                        version,
+                        counts,
+                    })
+                } else {
+                    let (ranges, new_text) = deserialize_edit_operation(&row.value)?;
+                    proto::operation::Variant::Edit(proto::operation::Edit {
+                        replica_id: row.replica_id as u32,
+                        local_timestamp: row.local_timestamp as u32,
+                        lamport_timestamp: row.lamport_timestamp as u32,
+                        version,
+                        ranges,
+                        new_text,
+                    })
+                };
+                operations.push(proto::Operation {
+                    variant: Some(operation),
+                })
+            }
+
+            Ok(Buffer {
+                base_text,
+                operations,
+            })
+        })
+        .await
+    }
+}
+
+mod storage {
+    #![allow(non_snake_case)]
+
+    use prost::Message;
+
+    pub const VERSION: usize = 1;
+
+    #[derive(Message)]
+    pub struct VectorClock {
+        #[prost(message, repeated, tag = "1")]
+        pub entries: Vec<VectorClockEntry>,
+    }
+
+    #[derive(Message)]
+    pub struct VectorClockEntry {
+        #[prost(uint32, tag = "1")]
+        pub replica_id: u32,
+        #[prost(uint32, tag = "2")]
+        pub timestamp: u32,
+    }
+
+    #[derive(Message)]
+    pub struct TextEdit {
+        #[prost(message, repeated, tag = "1")]
+        pub ranges: Vec<Range>,
+        #[prost(string, repeated, tag = "2")]
+        pub texts: Vec<String>,
+    }
+
+    #[derive(Message)]
+    pub struct Range {
+        #[prost(uint64, tag = "1")]
+        pub start: u64,
+        #[prost(uint64, tag = "2")]
+        pub end: u64,
+    }
+
+    #[derive(Message)]
+    pub struct Undo {
+        #[prost(message, repeated, tag = "1")]
+        pub entries: Vec<UndoCount>,
+    }
+
+    #[derive(Message)]
+    pub struct UndoCount {
+        #[prost(uint32, tag = "1")]
+        pub replica_id: u32,
+        #[prost(uint32, tag = "2")]
+        pub local_timestamp: u32,
+        #[prost(uint32, tag = "3")]
+        pub count: u32,
+    }
+}
+
+fn serialize_version(version: &Vec<proto::VectorClockEntry>) -> Vec<u8> {
+    storage::VectorClock {
+        entries: version
+            .iter()
+            .map(|entry| storage::VectorClockEntry {
+                replica_id: entry.replica_id,
+                timestamp: entry.timestamp,
+            })
+            .collect(),
+    }
+    .encode_to_vec()
+}
+
+fn deserialize_version(bytes: &[u8]) -> Result<Vec<proto::VectorClockEntry>> {
+    let clock = storage::VectorClock::decode(bytes).map_err(|error| anyhow!("{}", error))?;
+    Ok(clock
+        .entries
+        .into_iter()
+        .map(|entry| proto::VectorClockEntry {
+            replica_id: entry.replica_id,
+            timestamp: entry.timestamp,
+        })
+        .collect())
+}
+
+fn serialize_edit_operation(ranges: &[proto::Range], texts: &[String]) -> Vec<u8> {
+    storage::TextEdit {
+        ranges: ranges
+            .iter()
+            .map(|range| storage::Range {
+                start: range.start,
+                end: range.end,
+            })
+            .collect(),
+        texts: texts.to_vec(),
+    }
+    .encode_to_vec()
+}
+
+fn deserialize_edit_operation(bytes: &[u8]) -> Result<(Vec<proto::Range>, Vec<String>)> {
+    let edit = storage::TextEdit::decode(bytes).map_err(|error| anyhow!("{}", error))?;
+    let ranges = edit
+        .ranges
+        .into_iter()
+        .map(|range| proto::Range {
+            start: range.start,
+            end: range.end,
+        })
+        .collect();
+    Ok((ranges, edit.texts))
+}
+
+fn serialize_undo_operation(counts: &Vec<proto::UndoCount>) -> Vec<u8> {
+    storage::Undo {
+        entries: counts
+            .iter()
+            .map(|entry| storage::UndoCount {
+                replica_id: entry.replica_id,
+                local_timestamp: entry.local_timestamp,
+                count: entry.count,
+            })
+            .collect(),
+    }
+    .encode_to_vec()
+}
+
+fn deserialize_undo_operation(bytes: &[u8]) -> Result<Vec<proto::UndoCount>> {
+    let undo = storage::Undo::decode(bytes).map_err(|error| anyhow!("{}", error))?;
+    Ok(undo
+        .entries
+        .iter()
+        .map(|entry| proto::UndoCount {
+            replica_id: entry.replica_id,
+            local_timestamp: entry.local_timestamp,
+            count: entry.count,
+        })
+        .collect())
+}

crates/collab/src/db/tables.rs 🔗

@@ -1,4 +1,7 @@
 pub mod access_token;
+pub mod buffer;
+pub mod buffer_operation;
+pub mod buffer_snapshot;
 pub mod channel;
 pub mod channel_member;
 pub mod channel_path;

crates/collab/src/db/tables/buffer.rs 🔗

@@ -0,0 +1,32 @@
+use crate::db::BufferId;
+use sea_orm::entity::prelude::*;
+
+#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
+#[sea_orm(table_name = "buffers")]
+pub struct Model {
+    #[sea_orm(primary_key)]
+    pub id: BufferId,
+    pub epoch: i32,
+}
+
+#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
+pub enum Relation {
+    #[sea_orm(has_many = "super::buffer_operation::Entity")]
+    Operations,
+    #[sea_orm(has_many = "super::buffer_snapshot::Entity")]
+    Snapshots,
+}
+
+impl Related<super::buffer_operation::Entity> for Entity {
+    fn to() -> RelationDef {
+        Relation::Operations.def()
+    }
+}
+
+impl Related<super::buffer_snapshot::Entity> for Entity {
+    fn to() -> RelationDef {
+        Relation::Snapshots.def()
+    }
+}
+
+impl ActiveModelBehavior for ActiveModel {}

crates/collab/src/db/tables/buffer_operation.rs 🔗

@@ -0,0 +1,37 @@
+use crate::db::BufferId;
+use sea_orm::entity::prelude::*;
+
+#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
+#[sea_orm(table_name = "buffer_operations")]
+pub struct Model {
+    #[sea_orm(primary_key)]
+    pub buffer_id: BufferId,
+    #[sea_orm(primary_key)]
+    pub epoch: i32,
+    #[sea_orm(primary_key)]
+    pub lamport_timestamp: i32,
+    #[sea_orm(primary_key)]
+    pub replica_id: i32,
+    pub local_timestamp: i32,
+    pub version: Vec<u8>,
+    pub is_undo: bool,
+    pub value: Vec<u8>,
+}
+
+#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
+pub enum Relation {
+    #[sea_orm(
+        belongs_to = "super::buffer::Entity",
+        from = "Column::BufferId",
+        to = "super::buffer::Column::Id"
+    )]
+    Buffer,
+}
+
+impl Related<super::buffer::Entity> for Entity {
+    fn to() -> RelationDef {
+        Relation::Buffer.def()
+    }
+}
+
+impl ActiveModelBehavior for ActiveModel {}

crates/collab/src/db/tables/buffer_snapshot.rs 🔗

@@ -0,0 +1,30 @@
+use crate::db::BufferId;
+use sea_orm::entity::prelude::*;
+
+#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
+#[sea_orm(table_name = "buffer_snapshots")]
+pub struct Model {
+    #[sea_orm(primary_key)]
+    pub buffer_id: BufferId,
+    #[sea_orm(primary_key)]
+    pub epoch: i32,
+    pub text: String,
+}
+
+#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
+pub enum Relation {
+    #[sea_orm(
+        belongs_to = "super::buffer::Entity",
+        from = "Column::BufferId",
+        to = "super::buffer::Column::Id"
+    )]
+    Buffer,
+}
+
+impl Related<super::buffer::Entity> for Entity {
+    fn to() -> RelationDef {
+        Relation::Buffer.def()
+    }
+}
+
+impl ActiveModelBehavior for ActiveModel {}

crates/collab/src/db/test_db.rs 🔗

@@ -96,13 +96,17 @@ macro_rules! test_both_dbs {
     ($test_name:ident, $postgres_test_name:ident, $sqlite_test_name:ident) => {
         #[gpui::test]
         async fn $postgres_test_name() {
-            let test_db = TestDb::postgres(Deterministic::new(0).build_background());
+            let test_db = crate::db::test_db::TestDb::postgres(
+                gpui::executor::Deterministic::new(0).build_background(),
+            );
             $test_name(test_db.db()).await;
         }
 
         #[gpui::test]
         async fn $sqlite_test_name() {
-            let test_db = TestDb::sqlite(Deterministic::new(0).build_background());
+            let test_db = crate::db::test_db::TestDb::sqlite(
+                gpui::executor::Deterministic::new(0).build_background(),
+            );
             $test_name(test_db.db()).await;
         }
     };

crates/rpc/Cargo.toml 🔗

@@ -23,7 +23,7 @@ async-tungstenite = "0.16"
 base64 = "0.13"
 futures.workspace = true
 parking_lot.workspace = true
-prost = "0.8"
+prost.workspace = true
 rand.workspace = true
 rsa = "0.4"
 serde.workspace = true