WIP: pass synchronize channel buffers integration test

Mikayla created

Change summary

crates/channel/src/channel_buffer.rs                            | 113 +-
crates/collab/migrations.sqlite/20221109000000_test_schema.sql  |  21 
crates/collab/migrations/20230819154600_add_channel_buffers.sql |  18 
crates/collab/src/db/ids.rs                                     |   1 
crates/collab/src/db/queries/buffers.rs                         | 105 ++
crates/collab/src/db/queries/channels.rs                        |  28 
crates/collab/src/db/tables.rs                                  |   1 
crates/collab/src/db/tables/buffer.rs                           |  23 
crates/collab/src/db/tables/channel.rs                          |   9 
crates/collab/src/db/tables/channel_buffer_collaborator.rs      |  42 +
crates/collab/src/db/tests/buffer_tests.rs                      |  57 +
crates/collab/src/rpc.rs                                        |  52 +
crates/collab/src/tests/channel_buffer_tests.rs                 |  40 
crates/rpc/proto/zed.proto                                      |  25 
crates/rpc/src/proto.rs                                         |  11 
15 files changed, 411 insertions(+), 135 deletions(-)

Detailed changes

crates/channel/src/channel_buffer.rs 🔗

@@ -1,9 +1,10 @@
 use crate::ChannelId;
 use anyhow::Result;
 use client::Client;
-use gpui::{Entity, ModelContext, ModelHandle, Task};
-use rpc::proto::GetChannelBuffer;
+use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task};
+use rpc::{proto, TypedEnvelope};
 use std::sync::Arc;
+use util::ResultExt;
 
 // Open the channel document
 // ChannelDocumentView { ChannelDocument, Editor } -> On clone, clones internal ChannelDocument handle, instantiates new editor
@@ -14,9 +15,12 @@ use std::sync::Arc;
 //     ChannleBuffers: HashMap<bufferId, ModelHandle<language::Buffer>>
 // }
 
+type BufferId = u64;
+
 pub struct ChannelBuffer {
     channel_id: ChannelId,
-    buffer: Option<ModelHandle<language::Buffer>>,
+    buffer_id: BufferId,
+    buffer: ModelHandle<language::Buffer>,
     client: Arc<Client>,
 }
 
@@ -28,53 +32,76 @@ impl ChannelBuffer {
     pub fn for_channel(
         channel_id: ChannelId,
         client: Arc<Client>,
-        cx: &mut ModelContext<Self>,
-    ) -> Self {
-        Self {
-            channel_id,
-            client,
-            buffer: None,
-        }
-    }
+        cx: &mut AppContext,
+    ) -> Task<Result<ModelHandle<Self>>> {
+        cx.spawn(|mut cx| async move {
+            let response = client
+                .request(proto::OpenChannelBuffer { channel_id })
+                .await?;
 
-    fn on_buffer_update(
-        &mut self,
-        buffer: ModelHandle<language::Buffer>,
-        event: &language::Event,
-        cx: &mut ModelContext<Self>,
-    ) {
-        //
-    }
+            let base_text = response.base_text;
+            let operations = response
+                .operations
+                .into_iter()
+                .map(language::proto::deserialize_operation)
+                .collect::<Result<Vec<_>, _>>()?;
+            let buffer_id = response.buffer_id;
 
-    pub fn buffer(
-        &mut self,
-        cx: &mut ModelContext<Self>,
-    ) -> Task<Result<ModelHandle<language::Buffer>>> {
-        if let Some(buffer) = &self.buffer {
-            Task::ready(Ok(buffer.clone()))
-        } else {
-            let channel_id = self.channel_id;
-            let client = self.client.clone();
-            cx.spawn(|this, mut cx| async move {
-                let response = client.request(GetChannelBuffer { channel_id }).await?;
+            let buffer = cx.add_model(|cx| language::Buffer::new(0, base_text, cx));
+            buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
 
-                let base_text = response.base_text;
-                let operations = response
-                    .operations
-                    .into_iter()
-                    .map(language::proto::deserialize_operation)
-                    .collect::<Result<Vec<_>, _>>()?;
+            anyhow::Ok(cx.add_model(|cx| {
+                cx.subscribe(&buffer, Self::on_buffer_update).detach();
+                client.add_model_message_handler(Self::handle_update_channel_buffer);
+                Self {
+                    buffer_id,
+                    buffer,
+                    client,
+                    channel_id,
+                }
+            }))
+        })
+    }
+
+    async fn handle_update_channel_buffer(
+        this: ModelHandle<Self>,
+        update_channel_buffer: TypedEnvelope<proto::UpdateChannelBuffer>,
+        _: Arc<Client>,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        let ops = update_channel_buffer
+            .payload
+            .operations
+            .into_iter()
+            .map(language::proto::deserialize_operation)
+            .collect::<Result<Vec<_>, _>>()?;
 
-                this.update(&mut cx, |this, cx| {
-                    let buffer = cx.add_model(|cx| language::Buffer::new(0, base_text, cx));
-                    buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
+        this.update(&mut cx, |this, cx| {
+            this.buffer
+                .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
+        })?;
 
-                    cx.subscribe(&buffer, Self::on_buffer_update).detach();
+        Ok(())
+    }
 
-                    this.buffer = Some(buffer.clone());
-                    anyhow::Ok(buffer)
+    fn on_buffer_update(
+        &mut self,
+        _: ModelHandle<language::Buffer>,
+        event: &language::Event,
+        _: &mut ModelContext<Self>,
+    ) {
+        if let language::Event::Operation(operation) = event {
+            let operation = language::proto::serialize_operation(operation);
+            self.client
+                .send(proto::UpdateChannelBuffer {
+                    buffer_id: self.buffer_id,
+                    operations: vec![operation],
                 })
-            })
+                .log_err();
         }
     }
+
+    pub fn buffer(&self) -> ModelHandle<language::Buffer> {
+        self.buffer.clone()
+    }
 }

crates/collab/migrations.sqlite/20221109000000_test_schema.sql 🔗

@@ -189,8 +189,7 @@ CREATE INDEX "index_followers_on_room_id" ON "followers" ("room_id");
 CREATE TABLE "channels" (
     "id" INTEGER PRIMARY KEY AUTOINCREMENT,
     "name" VARCHAR NOT NULL,
-    "created_at" TIMESTAMP NOT NULL DEFAULT now,
-    "main_buffer_id" INTEGER REFERENCES buffers (id)
+    "created_at" TIMESTAMP NOT NULL DEFAULT now
 );
 
 CREATE TABLE "channel_paths" (
@@ -212,9 +211,12 @@ CREATE UNIQUE INDEX "index_channel_members_on_channel_id_and_user_id" ON "channe
 
 CREATE TABLE "buffers" (
     "id" INTEGER PRIMARY KEY AUTOINCREMENT,
+    "channel_id" INTEGER NOT NULL REFERENCES channels (id) ON DELETE CASCADE,
     "epoch" INTEGER NOT NULL DEFAULT 0
 );
 
+CREATE INDEX "index_buffers_on_channel_id" ON "buffers" ("channel_id");
+
 CREATE TABLE "buffer_operations" (
     "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE,
     "epoch" INTEGER NOT NULL,
@@ -233,3 +235,18 @@ CREATE TABLE "buffer_snapshots" (
     "text" TEXT NOT NULL,
     PRIMARY KEY(buffer_id, epoch)
 );
+
+CREATE TABLE "channel_buffer_collaborators" (
+    "id" INTEGER PRIMARY KEY AUTOINCREMENT,
+    "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE,
+    "connection_id" INTEGER NOT NULL,
+    "connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE,
+    "user_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE,
+    "replica_id" INTEGER NOT NULL
+);
+
+CREATE INDEX "index_channel_buffer_collaborators_on_buffer_id" ON "channel_buffer_collaborators" ("buffer_id");
+CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_buffer_id_and_replica_id" ON "channel_buffer_collaborators" ("buffer_id", "replica_id");
+CREATE INDEX "index_channel_buffer_collaborators_on_connection_server_id" ON "channel_buffer_collaborators" ("connection_server_id");
+CREATE INDEX "index_channel_buffer_collaborators_on_connection_id" ON "channel_buffer_collaborators" ("connection_id");
+CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_buffer_id_connection_id_and_server_id" ON "channel_buffer_collaborators" ("buffer_id", "connection_id", "connection_server_id");

crates/collab/migrations/20230819154600_add_channel_buffers.sql 🔗

@@ -1,8 +1,11 @@
 CREATE TABLE "buffers" (
     "id" SERIAL PRIMARY KEY,
+    "channel_id" INTEGER NOT NULL REFERENCES channels (id) ON DELETE CASCADE,
     "epoch" INTEGER NOT NULL DEFAULT 0
 );
 
+CREATE INDEX "index_buffers_on_channel_id" ON "buffers" ("channel_id");
+
 CREATE TABLE "buffer_operations" (
     "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE,
     "epoch" INTEGER NOT NULL,
@@ -22,4 +25,17 @@ CREATE TABLE "buffer_snapshots" (
     PRIMARY KEY(buffer_id, epoch)
 );
 
-ALTER TABLE "channels" ADD COLUMN "main_buffer_id" INTEGER REFERENCES buffers (id);
+CREATE TABLE "channel_buffer_collaborators" (
+    "id" SERIAL PRIMARY KEY,
+    "buffer_id" INTEGER NOT NULL REFERENCES buffers (id) ON DELETE CASCADE,
+    "connection_id" INTEGER NOT NULL,
+    "connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE,
+    "user_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE,
+    "replica_id" INTEGER NOT NULL
+);
+
+CREATE INDEX "index_channel_buffer_collaborators_on_buffer_id" ON "channel_buffer_collaborators" ("buffer_id");
+CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_buffer_id_and_replica_id" ON "channel_buffer_collaborators" ("buffer_id", "replica_id");
+CREATE INDEX "index_channel_buffer_collaborators_on_connection_server_id" ON "channel_buffer_collaborators" ("connection_server_id");
+CREATE INDEX "index_channel_buffer_collaborators_on_connection_id" ON "channel_buffer_collaborators" ("connection_id");
+CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_buffer_id_connection_id_and_server_id" ON "channel_buffer_collaborators" ("buffer_id", "connection_id", "connection_server_id");

crates/collab/src/db/ids.rs 🔗

@@ -124,3 +124,4 @@ id_type!(ReplicaId);
 id_type!(ServerId);
 id_type!(SignupId);
 id_type!(UserId);
+id_type!(ChannelBufferCollaboratorId);

crates/collab/src/db/queries/buffers.rs 🔗

@@ -1,20 +1,12 @@
 use super::*;
 use prost::Message;
 
-pub struct Buffer {
+pub struct ChannelBuffer {
     pub base_text: String,
     pub operations: Vec<proto::Operation>,
 }
 
 impl Database {
-    pub async fn create_buffer(&self) -> Result<BufferId> {
-        self.transaction(|tx| async move {
-            let buffer = buffer::ActiveModel::new().insert(&*tx).await?;
-            Ok(buffer.id)
-        })
-        .await
-    }
-
     pub async fn update_buffer(
         &self,
         buffer_id: BufferId,
@@ -69,13 +61,65 @@ impl Database {
         .await
     }
 
-    pub async fn get_buffer(&self, id: BufferId) -> Result<Buffer> {
+    pub async fn join_buffer_for_channel(
+        &self,
+        channel_id: ChannelId,
+        user_id: UserId,
+        connection: ConnectionId,
+    ) -> Result<ChannelBuffer> {
         self.transaction(|tx| async move {
-            let buffer = buffer::Entity::find_by_id(id)
-                .one(&*tx)
-                .await?
-                .ok_or_else(|| anyhow!("no such buffer"))?;
+            let tx = tx;
 
+            // Get or create buffer from channel
+            self.check_user_is_channel_member(channel_id, user_id, &tx)
+                .await?;
+
+            let buffer = channel::Model {
+                id: channel_id,
+                ..Default::default()
+            }
+            .find_related(buffer::Entity)
+            .one(&*tx)
+            .await?;
+
+            let buffer = if let Some(buffer) = buffer {
+                buffer
+            } else {
+                let buffer = buffer::ActiveModel {
+                    channel_id: ActiveValue::Set(channel_id),
+                    ..Default::default()
+                }
+                .insert(&*tx)
+                .await?;
+                buffer
+            };
+
+            // Join the collaborators
+            let collaborators = buffer
+                .find_related(channel_buffer_collaborator::Entity)
+                .all(&*tx)
+                .await?;
+            let replica_ids = collaborators
+                .iter()
+                .map(|c| c.replica_id)
+                .collect::<HashSet<_>>();
+            let mut replica_id = ReplicaId(0);
+            while replica_ids.contains(&replica_id) {
+                replica_id.0 += 1;
+            }
+            channel_buffer_collaborator::ActiveModel {
+                buffer_id: ActiveValue::Set(buffer.id),
+                connection_id: ActiveValue::Set(connection.id as i32),
+                connection_server_id: ActiveValue::Set(ServerId(connection.owner_id as i32)),
+                user_id: ActiveValue::Set(user_id),
+                replica_id: ActiveValue::Set(replica_id),
+                ..Default::default()
+            }
+            .insert(&*tx)
+            .await?;
+
+            // Assemble the buffer state
+            let id = buffer.id;
             let base_text = if buffer.epoch > 0 {
                 buffer_snapshot::Entity::find()
                     .filter(
@@ -128,13 +172,44 @@ impl Database {
                 })
             }
 
-            Ok(Buffer {
+            Ok(ChannelBuffer {
                 base_text,
                 operations,
             })
         })
         .await
     }
+
+    pub async fn get_buffer_collaborators(&self, buffer: BufferId) -> Result<()> {
+        todo!()
+    }
+
+    pub async fn leave_buffer(&self, buffer: BufferId, user: UserId) -> Result<()> {
+        self.transaction(|tx| async move {
+            //TODO
+            // let tx = tx;
+            // let channel = channel::Entity::find_by_id(channel_id)
+            //     .one(&*tx)
+            //     .await?
+            //     .ok_or_else(|| anyhow!("invalid channel"))?;
+
+            // if let Some(id) = channel.main_buffer_id {
+            //     return Ok(id);
+            // } else {
+            //     let buffer = buffer::ActiveModel::new().insert(&*tx).await?;
+            //     channel::ActiveModel {
+            //         id: ActiveValue::Unchanged(channel_id),
+            //         main_buffer_id: ActiveValue::Set(Some(buffer.id)),
+            //         ..Default::default()
+            //     }
+            //     .update(&*tx)
+            //     .await?;
+            //     Ok(buffer.id)
+            // }
+            Ok(())
+        })
+        .await
+    }
 }
 
 mod storage {

crates/collab/src/db/queries/channels.rs 🔗

@@ -689,34 +689,6 @@ impl Database {
         })
         .await
     }
-
-    pub async fn get_or_create_buffer_for_channel(
-        &self,
-        channel_id: ChannelId,
-    ) -> Result<BufferId> {
-        self.transaction(|tx| async move {
-            let tx = tx;
-            let channel = channel::Entity::find_by_id(channel_id)
-                .one(&*tx)
-                .await?
-                .ok_or_else(|| anyhow!("invalid channel"))?;
-
-            if let Some(id) = channel.main_buffer_id {
-                return Ok(id);
-            } else {
-                let buffer = buffer::ActiveModel::new().insert(&*tx).await?;
-                channel::ActiveModel {
-                    id: ActiveValue::Unchanged(channel_id),
-                    main_buffer_id: ActiveValue::Set(Some(buffer.id)),
-                    ..Default::default()
-                }
-                .update(&*tx)
-                .await?;
-                Ok(buffer.id)
-            }
-        })
-        .await
-    }
 }
 
 #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]

crates/collab/src/db/tables.rs 🔗

@@ -3,6 +3,7 @@ pub mod buffer;
 pub mod buffer_operation;
 pub mod buffer_snapshot;
 pub mod channel;
+pub mod channel_buffer_collaborator;
 pub mod channel_member;
 pub mod channel_path;
 pub mod contact;

crates/collab/src/db/tables/buffer.rs 🔗

@@ -1,4 +1,4 @@
-use crate::db::BufferId;
+use crate::db::{BufferId, ChannelId};
 use sea_orm::entity::prelude::*;
 
 #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
@@ -7,6 +7,7 @@ pub struct Model {
     #[sea_orm(primary_key)]
     pub id: BufferId,
     pub epoch: i32,
+    pub channel_id: ChannelId,
 }
 
 #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
@@ -15,6 +16,14 @@ pub enum Relation {
     Operations,
     #[sea_orm(has_many = "super::buffer_snapshot::Entity")]
     Snapshots,
+    #[sea_orm(
+        belongs_to = "super::channel::Entity",
+        from = "Column::ChannelId",
+        to = "super::channel::Column::Id"
+    )]
+    Channel,
+    #[sea_orm(has_many = "super::channel_buffer_collaborator::Entity")]
+    Collaborators,
 }
 
 impl Related<super::buffer_operation::Entity> for Entity {
@@ -29,4 +38,16 @@ impl Related<super::buffer_snapshot::Entity> for Entity {
     }
 }
 
+impl Related<super::channel::Entity> for Entity {
+    fn to() -> RelationDef {
+        Relation::Channel.def()
+    }
+}
+
+impl Related<super::channel_buffer_collaborator::Entity> for Entity {
+    fn to() -> RelationDef {
+        Relation::Collaborators.def()
+    }
+}
+
 impl ActiveModelBehavior for ActiveModel {}

crates/collab/src/db/tables/channel.rs 🔗

@@ -7,7 +7,6 @@ pub struct Model {
     #[sea_orm(primary_key)]
     pub id: ChannelId,
     pub name: String,
-    pub main_buffer_id: Option<BufferId>,
 }
 
 impl ActiveModelBehavior for ActiveModel {}
@@ -16,6 +15,8 @@ impl ActiveModelBehavior for ActiveModel {}
 pub enum Relation {
     #[sea_orm(has_one = "super::room::Entity")]
     Room,
+    #[sea_orm(has_one = "super::room::Entity")]
+    Buffer,
     #[sea_orm(has_many = "super::channel_member::Entity")]
     Member,
 }
@@ -31,3 +32,9 @@ impl Related<super::room::Entity> for Entity {
         Relation::Room.def()
     }
 }
+
+impl Related<super::buffer::Entity> for Entity {
+    fn to() -> RelationDef {
+        Relation::Buffer.def()
+    }
+}

crates/collab/src/db/tables/channel_buffer_collaborator.rs 🔗

@@ -0,0 +1,42 @@
+use crate::db::{BufferId, ChannelBufferCollaboratorId, ReplicaId, ServerId, UserId};
+use rpc::ConnectionId;
+use sea_orm::entity::prelude::*;
+
+#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
+#[sea_orm(table_name = "channel_buffer_collaborators")]
+pub struct Model {
+    #[sea_orm(primary_key)]
+    pub id: ChannelBufferCollaboratorId,
+    pub buffer_id: BufferId,
+    pub connection_id: i32,
+    pub connection_server_id: ServerId,
+    pub user_id: UserId,
+    pub replica_id: ReplicaId,
+}
+
+impl Model {
+    pub fn connection(&self) -> ConnectionId {
+        ConnectionId {
+            owner_id: self.connection_server_id.0 as u32,
+            id: self.connection_id as u32,
+        }
+    }
+}
+
+#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
+pub enum Relation {
+    #[sea_orm(
+        belongs_to = "super::buffer::Entity",
+        from = "Column::BufferId",
+        to = "super::buffer::Column::Id"
+    )]
+    Buffer,
+}
+
+impl Related<super::buffer::Entity> for Entity {
+    fn to() -> RelationDef {
+        Relation::Buffer.def()
+    }
+}
+
+impl ActiveModelBehavior for ActiveModel {}

crates/collab/src/db/tests/buffer_tests.rs 🔗

@@ -6,7 +6,60 @@ use text::Buffer;
 test_both_dbs!(test_buffers, test_buffers_postgres, test_buffers_sqlite);
 
 async fn test_buffers(db: &Arc<Database>) {
-    let buffer_id = db.create_buffer().await.unwrap();
+    // Prep database test info
+    let a_id = db
+        .create_user(
+            "user_a@example.com",
+            false,
+            NewUserParams {
+                github_login: "user_a".into(),
+                github_user_id: 101,
+                invite_count: 0,
+            },
+        )
+        .await
+        .unwrap()
+        .user_id;
+    let b_id = db
+        .create_user(
+            "user_b@example.com",
+            false,
+            NewUserParams {
+                github_login: "user_b".into(),
+                github_user_id: 102,
+                invite_count: 0,
+            },
+        )
+        .await
+        .unwrap()
+        .user_id;
+    // This user will not be a part of the channel
+    let c_id = db
+        .create_user(
+            "user_b@example.com",
+            false,
+            NewUserParams {
+                github_login: "user_b".into(),
+                github_user_id: 102,
+                invite_count: 0,
+            },
+        )
+        .await
+        .unwrap()
+        .user_id;
+
+    let zed_id = db.create_root_channel("zed", "1", a_id).await.unwrap();
+
+    db.invite_channel_member(zed_id, b_id, a_id, false)
+        .await
+        .unwrap();
+
+    db.respond_to_channel_invite(zed_id, b_id, true)
+        .await
+        .unwrap();
+
+    // TODO: Join buffer
+    let buffer_id = db.get_or_create_buffer_for_channel(zed_id);
 
     let mut buffer = Buffer::new(0, 0, "".to_string());
     let mut operations = Vec::new();
@@ -23,7 +76,7 @@ async fn test_buffers(db: &Arc<Database>) {
 
     db.update_buffer(buffer_id, &operations).await.unwrap();
 
-    let buffer_data = db.get_buffer(buffer_id).await.unwrap();
+    let buffer_data = db.open_buffer(buffer_id).await.unwrap();
 
     let mut buffer_2 = Buffer::new(0, 0, buffer_data.base_text);
     buffer_2

crates/collab/src/rpc.rs 🔗

@@ -2,7 +2,10 @@ mod connection_pool;
 
 use crate::{
     auth,
-    db::{self, ChannelId, ChannelsForUser, Database, ProjectId, RoomId, ServerId, User, UserId},
+    db::{
+        self, BufferId, ChannelId, ChannelsForUser, Database, ProjectId, RoomId, ServerId, User,
+        UserId,
+    },
     executor::Executor,
     AppState, Result,
 };
@@ -35,8 +38,8 @@ use lazy_static::lazy_static;
 use prometheus::{register_int_gauge, IntGauge};
 use rpc::{
     proto::{
-        self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, GetChannelBufferResponse,
-        LiveKitConnectionInfo, RequestMessage,
+        self, Ack, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, LiveKitConnectionInfo,
+        OpenChannelBufferResponse, RequestMessage,
     },
     Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
 };
@@ -248,7 +251,9 @@ impl Server {
             .add_request_handler(remove_channel_member)
             .add_request_handler(set_channel_member_admin)
             .add_request_handler(rename_channel)
-            .add_request_handler(get_channel_buffer)
+            .add_request_handler(open_channel_buffer)
+            .add_request_handler(close_channel_buffer)
+            .add_message_handler(update_channel_buffer)
             .add_request_handler(get_channel_members)
             .add_request_handler(respond_to_channel_invite)
             .add_request_handler(join_channel)
@@ -2479,9 +2484,9 @@ async fn join_channel(
     Ok(())
 }
 
-async fn get_channel_buffer(
-    request: proto::GetChannelBuffer,
-    response: Response<proto::GetChannelBuffer>,
+async fn open_channel_buffer(
+    request: proto::OpenChannelBuffer,
+    response: Response<proto::OpenChannelBuffer>,
     session: Session,
 ) -> Result<()> {
     let db = session.db().await;
@@ -2489,9 +2494,12 @@ async fn get_channel_buffer(
 
     let buffer_id = db.get_or_create_buffer_for_channel(channel_id).await?;
 
-    let buffer = db.get_buffer(buffer_id).await?;
+    // TODO: join channel_buffer
+
+    let buffer = db.open_buffer(buffer_id).await?;
 
-    response.send(GetChannelBufferResponse {
+    response.send(OpenChannelBufferResponse {
+        buffer_id: buffer_id.to_proto(),
         base_text: buffer.base_text,
         operations: buffer.operations,
     })?;
@@ -2499,6 +2507,32 @@ async fn get_channel_buffer(
     Ok(())
 }
 
+async fn close_channel_buffer(
+    request: proto::CloseChannelBuffer,
+    response: Response<proto::CloseChannelBuffer>,
+    session: Session,
+) -> Result<()> {
+    let db = session.db().await;
+    let buffer_id = BufferId::from_proto(request.buffer_id);
+
+    // TODO: close channel buffer here
+    //
+    response.send(Ack {})?;
+
+    Ok(())
+}
+
+async fn update_channel_buffer(
+    request: proto::UpdateChannelBuffer,
+    session: Session,
+) -> Result<()> {
+    let db = session.db().await;
+
+    // TODO: Broadcast to buffer members
+
+    Ok(())
+}
+
 async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
     let project_id = ProjectId::from_proto(request.project_id);
     let project_connection_ids = session

crates/collab/src/tests/channel_buffer_tests.rs 🔗

@@ -19,45 +19,39 @@ async fn test_channel_buffers(
         .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)])
         .await;
 
-    let a_document =
-        cx_a.add_model(|cx| ChannelBuffer::for_channel(zed_id, client_a.client().to_owned(), cx));
-    let channel_buffer_a = a_document
-        .update(cx_a, |doc, cx| doc.buffer(cx))
+    let channel_buffer_a = cx_a
+        .update(|cx| ChannelBuffer::for_channel(zed_id, client_a.client().to_owned(), cx))
         .await
         .unwrap();
 
-    edit_channel_buffer(&channel_buffer_a, cx_a, [(0..0, "hello world")]);
-    edit_channel_buffer(&channel_buffer_a, cx_a, [(5..5, ", cruel")]);
-    edit_channel_buffer(&channel_buffer_a, cx_a, [(0..5, "goodbye")]);
-    undo_channel_buffer(&channel_buffer_a, cx_a);
+    let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
 
-    assert_eq!(
-        channel_buffer_text(&channel_buffer_a, cx_a),
-        "hello, cruel world"
-    );
+    edit_channel_buffer(&buffer_a, cx_a, [(0..0, "hello world")]);
+    edit_channel_buffer(&buffer_a, cx_a, [(5..5, ", cruel")]);
+    edit_channel_buffer(&buffer_a, cx_a, [(0..5, "goodbye")]);
+    undo_channel_buffer(&buffer_a, cx_a);
+
+    assert_eq!(channel_buffer_text(&buffer_a, cx_a), "hello, cruel world");
 
-    let b_document =
-        cx_b.add_model(|cx| ChannelBuffer::for_channel(zed_id, client_b.client().to_owned(), cx));
-    let channel_buffer_b = b_document
-        .update(cx_b, |doc, cx| doc.buffer(cx))
+    let channel_buffer_b = cx_b
+        .update(|cx| ChannelBuffer::for_channel(zed_id, client_b.client().to_owned(), cx))
         .await
         .unwrap();
 
-    assert_eq!(
-        channel_buffer_text(&channel_buffer_b, cx_b),
-        "hello, cruel world"
-    );
+    let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
+
+    assert_eq!(channel_buffer_text(&buffer_b, cx_b), "hello, cruel world");
 
-    edit_channel_buffer(&channel_buffer_b, cx_b, [(7..12, "beautiful")]);
+    edit_channel_buffer(&buffer_b, cx_b, [(7..12, "beautiful")]);
 
     deterministic.run_until_parked();
 
     assert_eq!(
-        channel_buffer_text(&channel_buffer_a, cx_a),
+        channel_buffer_text(&buffer_a, cx_a),
         "hello, beautiful world"
     );
     assert_eq!(
-        channel_buffer_text(&channel_buffer_b, cx_b),
+        channel_buffer_text(&buffer_b, cx_b),
         "hello, beautiful world"
     );
 }

crates/rpc/proto/zed.proto 🔗

@@ -143,8 +143,10 @@ message Envelope {
         SetChannelMemberAdmin set_channel_member_admin = 129;
         RenameChannel rename_channel = 130;
 
-        GetChannelBuffer get_channel_buffer = 131;
-        GetChannelBufferResponse get_channel_buffer_response = 132;
+        OpenChannelBuffer open_channel_buffer = 131;
+        OpenChannelBufferResponse open_channel_buffer_response = 132;
+        UpdateChannelBuffer update_channel_buffer = 133;
+        CloseChannelBuffer close_channel_buffer = 134;
     }
 }
 
@@ -543,6 +545,11 @@ message UpdateBuffer {
     repeated Operation operations = 3;
 }
 
+message UpdateChannelBuffer {
+    uint64 buffer_id = 2;
+    repeated Operation operations = 3;
+}
+
 message UpdateBufferFile {
     uint64 project_id = 1;
     uint64 buffer_id = 2;
@@ -951,13 +958,18 @@ message RenameChannel {
     string name = 2;
 }
 
-message GetChannelBuffer {
+message OpenChannelBuffer {
     uint64 channel_id = 1;
 }
 
-message GetChannelBufferResponse {
-    string base_text = 1;
-    repeated Operation operations = 2;
+message OpenChannelBufferResponse {
+    uint64 buffer_id = 1;
+    string base_text = 2;
+    repeated Operation operations = 3;
+}
+
+message CloseChannelBuffer {
+    uint64 buffer_id = 1;
 }
 
 message RespondToChannelInvite {
@@ -1156,7 +1168,6 @@ enum GitStatus {
     Conflict = 2;
 }
 
-
 message BufferState {
     uint64 id = 1;
     optional File file = 2;

crates/rpc/src/proto.rs 🔗

@@ -249,8 +249,10 @@ messages!(
     (GetPrivateUserInfoResponse, Foreground),
     (GetChannelMembers, Foreground),
     (GetChannelMembersResponse, Foreground),
-    (GetChannelBuffer, Foreground),
-    (GetChannelBufferResponse, Foreground)
+    (OpenChannelBuffer, Foreground),
+    (OpenChannelBufferResponse, Foreground),
+    (CloseChannelBuffer, Background),
+    (UpdateChannelBuffer, Foreground)
 );
 
 request_messages!(
@@ -317,7 +319,8 @@ request_messages!(
     (UpdateParticipantLocation, Ack),
     (UpdateProject, Ack),
     (UpdateWorktree, Ack),
-    (GetChannelBuffer, GetChannelBufferResponse)
+    (OpenChannelBuffer, OpenChannelBufferResponse),
+    (CloseChannelBuffer, Ack)
 );
 
 entity_messages!(
@@ -373,6 +376,8 @@ entity_messages!(
     UpdateDiffBase
 );
 
+entity_messages!(buffer_id, UpdateChannelBuffer);
+
 const KIB: usize = 1024;
 const MIB: usize = KIB * 1024;
 const MAX_BUFFER_LEN: usize = MIB;