Achieve end to end channel buffer synchronization

Mikayla and max created

co-authored-by: max <max@zed.dev>

Change summary

crates/channel/src/channel.rs                   |   7 
crates/channel/src/channel_buffer.rs            |  91 ++++++++-
crates/collab/src/db/queries/buffers.rs         | 163 +++++++++++++-----
crates/collab/src/db/tests/buffer_tests.rs      |  28 ++
crates/collab/src/rpc.rs                        | 105 ++++++++++-
crates/collab/src/tests.rs                      |   1 
crates/collab/src/tests/channel_buffer_tests.rs | 117 +++++++++----
crates/rpc/proto/zed.proto                      |  23 ++
crates/rpc/src/proto.rs                         |  11 +
crates/zed/src/main.rs                          |   1 
10 files changed, 422 insertions(+), 125 deletions(-)

Detailed changes

crates/channel/src/channel.rs 🔗

@@ -1,7 +1,14 @@
 mod channel_store;
 
 pub mod channel_buffer;
+use std::sync::Arc;
+
 pub use channel_store::*;
+use client::Client;
 
 #[cfg(test)]
 mod channel_store_tests;
+
+pub fn init(client: &Arc<Client>) {
+    channel_buffer::init(client);
+}

crates/channel/src/channel_buffer.rs 🔗

@@ -6,30 +6,34 @@ use rpc::{proto, TypedEnvelope};
 use std::sync::Arc;
 use util::ResultExt;
 
-// Open the channel document
-// ChannelDocumentView { ChannelDocument, Editor } -> On clone, clones internal ChannelDocument handle, instantiates new editor
-// Produces a view which is: (ChannelDocument, Editor), ChannelDocument manages subscriptions
-// ChannelDocuments -> Buffers -> Editor with that buffer
-
-// ChannelDocuments {
-//     ChannleBuffers: HashMap<bufferId, ModelHandle<language::Buffer>>
-// }
-
-type BufferId = u64;
+pub(crate) fn init(client: &Arc<Client>) {
+    client.add_model_message_handler(ChannelBuffer::handle_update_channel_buffer);
+    client.add_model_message_handler(ChannelBuffer::handle_add_channel_buffer_collaborator);
+    client.add_model_message_handler(ChannelBuffer::handle_remove_channel_buffer_collaborator);
+}
 
 pub struct ChannelBuffer {
     channel_id: ChannelId,
-    buffer_id: BufferId,
+    collaborators: Vec<proto::Collaborator>,
     buffer: ModelHandle<language::Buffer>,
     client: Arc<Client>,
+    _subscription: client::Subscription,
 }
 
 impl Entity for ChannelBuffer {
     type Event = ();
+
+    fn release(&mut self, _: &mut AppContext) {
+        self.client
+            .send(proto::LeaveChannelBuffer {
+                channel_id: self.channel_id,
+            })
+            .log_err();
+    }
 }
 
 impl ChannelBuffer {
-    pub fn for_channel(
+    pub fn join_channel(
         channel_id: ChannelId,
         client: Arc<Client>,
         cx: &mut AppContext,
@@ -45,19 +49,24 @@ impl ChannelBuffer {
                 .into_iter()
                 .map(language::proto::deserialize_operation)
                 .collect::<Result<Vec<_>, _>>()?;
-            let buffer_id = response.buffer_id;
 
-            let buffer = cx.add_model(|cx| language::Buffer::new(0, base_text, cx));
+            let collaborators = response.collaborators;
+
+            let buffer =
+                cx.add_model(|cx| language::Buffer::new(response.replica_id as u16, base_text, cx));
             buffer.update(&mut cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
 
+            let subscription = client.subscribe_to_entity(channel_id)?;
+
             anyhow::Ok(cx.add_model(|cx| {
                 cx.subscribe(&buffer, Self::on_buffer_update).detach();
-                client.add_model_message_handler(Self::handle_update_channel_buffer);
+
                 Self {
-                    buffer_id,
                     buffer,
                     client,
                     channel_id,
+                    collaborators,
+                    _subscription: subscription.set_model(&cx.handle(), &mut cx.to_async()),
                 }
             }))
         })
@@ -77,6 +86,7 @@ impl ChannelBuffer {
             .collect::<Result<Vec<_>, _>>()?;
 
         this.update(&mut cx, |this, cx| {
+            cx.notify();
             this.buffer
                 .update(cx, |buffer, cx| buffer.apply_ops(ops, cx))
         })?;
@@ -84,6 +94,49 @@ impl ChannelBuffer {
         Ok(())
     }
 
+    async fn handle_add_channel_buffer_collaborator(
+        this: ModelHandle<Self>,
+        envelope: TypedEnvelope<proto::AddChannelBufferCollaborator>,
+        _: Arc<Client>,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        let collaborator = envelope.payload.collaborator.ok_or_else(|| {
+            anyhow::anyhow!(
+                "Should have gotten a collaborator in the AddChannelBufferCollaborator message"
+            )
+        })?;
+
+        this.update(&mut cx, |this, cx| {
+            this.collaborators.push(collaborator);
+            cx.notify();
+        });
+
+        Ok(())
+    }
+
+    async fn handle_remove_channel_buffer_collaborator(
+        this: ModelHandle<Self>,
+        message: TypedEnvelope<proto::RemoveChannelBufferCollaborator>,
+        _: Arc<Client>,
+        mut cx: AsyncAppContext,
+    ) -> Result<()> {
+        this.update(&mut cx, |this, cx| {
+            this.collaborators.retain(|collaborator| {
+                if collaborator.peer_id == message.payload.peer_id {
+                    this.buffer.update(cx, |buffer, cx| {
+                        buffer.remove_peer(collaborator.replica_id as u16, cx)
+                    });
+                    false
+                } else {
+                    true
+                }
+            });
+            cx.notify();
+        });
+
+        Ok(())
+    }
+
     fn on_buffer_update(
         &mut self,
         _: ModelHandle<language::Buffer>,
@@ -94,7 +147,7 @@ impl ChannelBuffer {
             let operation = language::proto::serialize_operation(operation);
             self.client
                 .send(proto::UpdateChannelBuffer {
-                    buffer_id: self.buffer_id,
+                    channel_id: self.channel_id,
                     operations: vec![operation],
                 })
                 .log_err();
@@ -104,4 +157,8 @@ impl ChannelBuffer {
     pub fn buffer(&self) -> ModelHandle<language::Buffer> {
         self.buffer.clone()
     }
+
+    pub fn collaborators(&self) -> &[proto::Collaborator] {
+        &self.collaborators
+    }
 }

crates/collab/src/db/queries/buffers.rs 🔗

@@ -11,7 +11,6 @@ impl Database {
         self.transaction(|tx| async move {
             let tx = tx;
 
-            // Get or create buffer from channel
             self.check_user_is_channel_member(channel_id, user_id, &tx)
                 .await?;
 
@@ -116,6 +115,7 @@ impl Database {
 
             Ok(proto::JoinChannelBufferResponse {
                 buffer_id: buffer.id.to_proto(),
+                replica_id: replica_id.to_proto() as u32,
                 base_text,
                 operations,
                 collaborators: collaborators
@@ -137,67 +137,128 @@ impl Database {
         connection: ConnectionId,
     ) -> Result<Vec<ConnectionId>> {
         self.transaction(|tx| async move {
-            let result = channel_buffer_collaborator::Entity::delete_many()
-                .filter(
-                    Condition::all()
-                        .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
-                        .add(
-                            channel_buffer_collaborator::Column::ConnectionId
-                                .eq(connection.id as i32),
-                        )
-                        .add(
-                            channel_buffer_collaborator::Column::ConnectionServerId
-                                .eq(connection.owner_id as i32),
-                        ),
-                )
-                .exec(&*tx)
-                .await?;
-            if result.rows_affected == 0 {
-                Err(anyhow!("not a collaborator on this project"))?;
+            self.leave_channel_buffer_internal(channel_id, connection, &*tx)
+                .await
+        })
+        .await
+    }
+
+    pub async fn leave_channel_buffer_internal(
+        &self,
+        channel_id: ChannelId,
+        connection: ConnectionId,
+        tx: &DatabaseTransaction,
+    ) -> Result<Vec<ConnectionId>> {
+        let result = channel_buffer_collaborator::Entity::delete_many()
+            .filter(
+                Condition::all()
+                    .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id))
+                    .add(channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32))
+                    .add(
+                        channel_buffer_collaborator::Column::ConnectionServerId
+                            .eq(connection.owner_id as i32),
+                    ),
+            )
+            .exec(&*tx)
+            .await?;
+        if result.rows_affected == 0 {
+            Err(anyhow!("not a collaborator on this project"))?;
+        }
+
+        let mut connections = Vec::new();
+        let mut rows = channel_buffer_collaborator::Entity::find()
+            .filter(
+                Condition::all().add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
+            )
+            .stream(&*tx)
+            .await?;
+        while let Some(row) = rows.next().await {
+            let row = row?;
+            connections.push(ConnectionId {
+                id: row.connection_id as u32,
+                owner_id: row.connection_server_id.0 as u32,
+            });
+        }
+
+        Ok(connections)
+    }
+
+    pub async fn leave_channel_buffers(
+        &self,
+        connection: ConnectionId,
+    ) -> Result<Vec<(ChannelId, Vec<ConnectionId>)>> {
+        self.transaction(|tx| async move {
+            #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
+            enum QueryChannelIds {
+                ChannelId,
             }
 
-            let mut connections = Vec::new();
-            let mut rows = channel_buffer_collaborator::Entity::find()
-                .filter(
-                    Condition::all()
-                        .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
-                )
-                .stream(&*tx)
+            let channel_ids: Vec<ChannelId> = channel_buffer_collaborator::Entity::find()
+                .select_only()
+                .column(channel_buffer_collaborator::Column::ChannelId)
+                .filter(Condition::all().add(
+                    channel_buffer_collaborator::Column::ConnectionId.eq(connection.id as i32),
+                ))
+                .into_values::<_, QueryChannelIds>()
+                .all(&*tx)
                 .await?;
-            while let Some(row) = rows.next().await {
-                let row = row?;
-                connections.push(ConnectionId {
-                    id: row.connection_id as u32,
-                    owner_id: row.connection_server_id.0 as u32,
-                });
+
+            let mut result = Vec::new();
+            for channel_id in channel_ids {
+                let collaborators = self
+                    .leave_channel_buffer_internal(channel_id, connection, &*tx)
+                    .await?;
+                result.push((channel_id, collaborators));
             }
 
-            Ok(connections)
+            Ok(result)
         })
         .await
     }
 
-    pub async fn leave_channel_buffers(
+    #[cfg(debug_assertions)]
+    pub async fn get_channel_buffer_collaborators(
         &self,
-        connection: ConnectionId,
-    ) -> Result<Option<LeftChannelBuffers>> {
-        //
-    }
+        channel_id: ChannelId,
+    ) -> Result<Vec<UserId>> {
+        self.transaction(|tx| async move {
+            #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)]
+            enum QueryUserIds {
+                UserId,
+            }
 
-    pub async fn get_channel_buffer_collaborators(&self, channel_id: ChannelId) -> Result<()> {
-        todo!()
+            let users: Vec<UserId> = channel_buffer_collaborator::Entity::find()
+                .select_only()
+                .column(channel_buffer_collaborator::Column::UserId)
+                .filter(
+                    Condition::all()
+                        .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
+                )
+                .into_values::<_, QueryUserIds>()
+                .all(&*tx)
+                .await?;
+
+            Ok(users)
+        })
+        .await
     }
 
     pub async fn update_channel_buffer(
         &self,
-        buffer_id: BufferId,
+        channel_id: ChannelId,
+        user: UserId,
         operations: &[proto::Operation],
-    ) -> Result<()> {
+    ) -> Result<Vec<ConnectionId>> {
         self.transaction(|tx| async move {
-            let buffer = buffer::Entity::find_by_id(buffer_id)
+            self.check_user_is_channel_member(channel_id, user, &*tx)
+                .await?;
+
+            let buffer = buffer::Entity::find()
+                .filter(buffer::Column::ChannelId.eq(channel_id))
                 .one(&*tx)
                 .await?
                 .ok_or_else(|| anyhow!("no such buffer"))?;
+            let buffer_id = buffer.id;
             buffer_operation::Entity::insert_many(operations.iter().filter_map(|operation| {
                 match operation.variant.as_ref()? {
                     proto::operation::Variant::Edit(operation) => {
@@ -237,7 +298,23 @@ impl Database {
             .exec(&*tx)
             .await?;
 
-            Ok(())
+            let mut connections = Vec::new();
+            let mut rows = channel_buffer_collaborator::Entity::find()
+                .filter(
+                    Condition::all()
+                        .add(channel_buffer_collaborator::Column::ChannelId.eq(channel_id)),
+                )
+                .stream(&*tx)
+                .await?;
+            while let Some(row) = rows.next().await {
+                let row = row?;
+                connections.push(ConnectionId {
+                    id: row.connection_id as u32,
+                    owner_id: row.connection_server_id.0 as u32,
+                });
+            }
+
+            Ok(connections)
         })
         .await
     }

crates/collab/src/db/tests/buffer_tests.rs 🔗

@@ -66,11 +66,10 @@ async fn test_channel_buffers(db: &Arc<Database>) {
         .unwrap();
 
     let connection_id_a = ConnectionId { owner_id, id: 1 };
-    let buffer_response_a = db
+    let _ = db
         .join_channel_buffer(zed_id, a_id, connection_id_a)
         .await
         .unwrap();
-    let buffer_id = BufferId::from_proto(buffer_response_a.buffer_id);
 
     let mut buffer_a = Buffer::new(0, 0, "".to_string());
     let mut operations = Vec::new();
@@ -85,7 +84,7 @@ async fn test_channel_buffers(db: &Arc<Database>) {
         .map(|op| proto::serialize_operation(&language::Operation::Buffer(op)))
         .collect::<Vec<_>>();
 
-    db.update_channel_buffer(buffer_id, &operations)
+    db.update_channel_buffer(zed_id, a_id, &operations)
         .await
         .unwrap();
 
@@ -115,7 +114,7 @@ async fn test_channel_buffers(db: &Arc<Database>) {
         .await
         .is_err());
 
-    //Ensure that both collaborators have shown up
+    // Ensure that both collaborators have shown up
     assert_eq!(
         buffer_response_b.collaborators,
         &[
@@ -132,6 +131,10 @@ async fn test_channel_buffers(db: &Arc<Database>) {
         ]
     );
 
+    // Ensure that get_channel_buffer_collaborators works
+    let zed_collaborats = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
+    assert_eq!(zed_collaborats, &[a_id, b_id]);
+
     let collaborators = db
         .leave_channel_buffer(zed_id, connection_id_b)
         .await
@@ -139,7 +142,18 @@ async fn test_channel_buffers(db: &Arc<Database>) {
 
     assert_eq!(collaborators, &[connection_id_a],);
 
-    db.connection_lost(connection_id_a).await.unwrap();
-    // assert!()
-    // Test buffer epoch incrementing?
+    let cargo_id = db.create_root_channel("cargo", "2", a_id).await.unwrap();
+    let _ = db
+        .join_channel_buffer(cargo_id, a_id, connection_id_a)
+        .await
+        .unwrap();
+
+    db.leave_channel_buffers(connection_id_a).await.unwrap();
+
+    let zed_collaborators = db.get_channel_buffer_collaborators(zed_id).await.unwrap();
+    let cargo_collaborators = db.get_channel_buffer_collaborators(cargo_id).await.unwrap();
+    assert_eq!(zed_collaborators, &[]);
+    assert_eq!(cargo_collaborators, &[]);
+
+    // TODO: test buffer epoch incrementing
 }

crates/collab/src/rpc.rs 🔗

@@ -2,10 +2,7 @@ mod connection_pool;
 
 use crate::{
     auth,
-    db::{
-        self, BufferId, ChannelId, ChannelsForUser, Database, ProjectId, RoomId, ServerId, User,
-        UserId,
-    },
+    db::{self, ChannelId, ChannelsForUser, Database, ProjectId, RoomId, ServerId, User, UserId},
     executor::Executor,
     AppState, Result,
 };
@@ -38,8 +35,8 @@ use lazy_static::lazy_static;
 use prometheus::{register_int_gauge, IntGauge};
 use rpc::{
     proto::{
-        self, Ack, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, LiveKitConnectionInfo,
-        RequestMessage,
+        self, Ack, AddChannelBufferCollaborator, AnyTypedEnvelope, EntityMessage, EnvelopedMessage,
+        LiveKitConnectionInfo, RequestMessage,
     },
     Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
 };
@@ -860,6 +857,7 @@ async fn connection_lost(
     futures::select_biased! {
         _ = executor.sleep(RECONNECT_TIMEOUT).fuse() => {
             leave_room_for_session(&session).await.trace_err();
+            leave_channel_buffers_for_session(&session).await.trace_err();
 
             if !session
                 .connection_pool()
@@ -872,6 +870,8 @@ async fn connection_lost(
                 }
             }
             update_user_contacts(session.user_id, &session).await?;
+
+
         }
         _ = teardown.changed().fuse() => {}
     }
@@ -2496,8 +2496,51 @@ async fn join_channel_buffer(
         .join_channel_buffer(channel_id, session.user_id, session.connection_id)
         .await?;
 
+    let replica_id = open_response.replica_id;
+    let collaborators = open_response.collaborators.clone();
+
     response.send(open_response)?;
 
+    let update = AddChannelBufferCollaborator {
+        channel_id: channel_id.to_proto(),
+        collaborator: Some(proto::Collaborator {
+            user_id: session.user_id.to_proto(),
+            peer_id: Some(session.connection_id.into()),
+            replica_id,
+        }),
+    };
+    channel_buffer_updated(
+        session.connection_id,
+        collaborators
+            .iter()
+            .filter_map(|collaborator| Some(collaborator.peer_id?.into())),
+        &update,
+        &session.peer,
+    );
+
+    Ok(())
+}
+
+async fn update_channel_buffer(
+    request: proto::UpdateChannelBuffer,
+    session: Session,
+) -> Result<()> {
+    let db = session.db().await;
+    let channel_id = ChannelId::from_proto(request.channel_id);
+
+    let collaborators = db
+        .update_channel_buffer(channel_id, session.user_id, &request.operations)
+        .await?;
+
+    channel_buffer_updated(
+        session.connection_id,
+        collaborators,
+        &proto::UpdateChannelBuffer {
+            channel_id: channel_id.to_proto(),
+            operations: request.operations,
+        },
+        &session.peer,
+    );
     Ok(())
 }
 
@@ -2515,18 +2558,28 @@ async fn leave_channel_buffer(
 
     response.send(Ack {})?;
 
+    channel_buffer_updated(
+        session.connection_id,
+        collaborators_to_notify,
+        &proto::RemoveChannelBufferCollaborator {
+            channel_id: channel_id.to_proto(),
+            peer_id: Some(session.connection_id.into()),
+        },
+        &session.peer,
+    );
+
     Ok(())
 }
 
-async fn update_channel_buffer(
-    request: proto::UpdateChannelBuffer,
-    session: Session,
-) -> Result<()> {
-    let db = session.db().await;
-
-    // TODO: Broadcast to buffer members
-
-    Ok(())
+fn channel_buffer_updated<T: EnvelopedMessage>(
+    sender_id: ConnectionId,
+    collaborators: impl IntoIterator<Item = ConnectionId>,
+    message: &T,
+    peer: &Peer,
+) {
+    broadcast(Some(sender_id), collaborators.into_iter(), |peer_id| {
+        peer.send(peer_id.into(), message.clone())
+    });
 }
 
 async fn update_diff_base(request: proto::UpdateDiffBase, session: Session) -> Result<()> {
@@ -2854,6 +2907,28 @@ async fn leave_room_for_session(session: &Session) -> Result<()> {
     Ok(())
 }
 
+async fn leave_channel_buffers_for_session(session: &Session) -> Result<()> {
+    let left_channel_buffers = session
+        .db()
+        .await
+        .leave_channel_buffers(session.connection_id)
+        .await?;
+
+    for (channel_id, connections) in left_channel_buffers {
+        channel_buffer_updated(
+            session.connection_id,
+            connections,
+            &proto::RemoveChannelBufferCollaborator {
+                channel_id: channel_id.to_proto(),
+                peer_id: Some(session.connection_id.into()),
+            },
+            &session.peer,
+        );
+    }
+
+    Ok(())
+}
+
 fn project_left(project: &db::LeftProject, session: &Session) {
     for connection_id in &project.connection_ids {
         if project.host_user_id == session.user_id {

crates/collab/src/tests.rs 🔗

@@ -211,6 +211,7 @@ impl TestServer {
             workspace::init(app_state.clone(), cx);
             audio::init((), cx);
             call::init(client.clone(), user_store.clone(), cx);
+            channel::init(&client);
         });
 
         client

crates/collab/src/tests/channel_buffer_tests.rs 🔗

@@ -1,11 +1,13 @@
-use crate::tests::TestServer;
+use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer};
 
 use channel::channel_buffer::ChannelBuffer;
+use client::UserId;
 use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
-use std::{ops::Range, sync::Arc};
+use rpc::{proto, RECEIVE_TIMEOUT};
+use std::sync::Arc;
 
 #[gpui::test]
-async fn test_channel_buffers(
+async fn test_core_channel_buffers(
     deterministic: Arc<Deterministic>,
     cx_a: &mut TestAppContext,
     cx_b: &mut TestAppContext,
@@ -19,60 +21,103 @@ async fn test_channel_buffers(
         .make_channel("zed", (&client_a, cx_a), &mut [(&client_b, cx_b)])
         .await;
 
+    // Client A joins the channel buffer
     let channel_buffer_a = cx_a
-        .update(|cx| ChannelBuffer::for_channel(zed_id, client_a.client().to_owned(), cx))
+        .update(|cx| ChannelBuffer::join_channel(zed_id, client_a.client().to_owned(), cx))
         .await
         .unwrap();
 
+    // Client A edits the buffer
     let buffer_a = channel_buffer_a.read_with(cx_a, |buffer, _| buffer.buffer());
 
-    edit_channel_buffer(&buffer_a, cx_a, [(0..0, "hello world")]);
-    edit_channel_buffer(&buffer_a, cx_a, [(5..5, ", cruel")]);
-    edit_channel_buffer(&buffer_a, cx_a, [(0..5, "goodbye")]);
-    undo_channel_buffer(&buffer_a, cx_a);
+    buffer_a.update(cx_a, |buffer, cx| {
+        buffer.edit([(0..0, "hello world")], None, cx)
+    });
+    buffer_a.update(cx_a, |buffer, cx| {
+        buffer.edit([(5..5, ", cruel")], None, cx)
+    });
+    buffer_a.update(cx_a, |buffer, cx| {
+        buffer.edit([(0..5, "goodbye")], None, cx)
+    });
+    buffer_a.update(cx_a, |buffer, cx| buffer.undo(cx));
+    deterministic.run_until_parked();
 
-    assert_eq!(channel_buffer_text(&buffer_a, cx_a), "hello, cruel world");
+    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, cruel world");
 
+    // Client B joins the channel buffer
     let channel_buffer_b = cx_b
-        .update(|cx| ChannelBuffer::for_channel(zed_id, client_b.client().to_owned(), cx))
+        .update(|cx| ChannelBuffer::join_channel(zed_id, client_b.client().to_owned(), cx))
         .await
         .unwrap();
 
-    let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
+    channel_buffer_b.read_with(cx_b, |buffer, _| {
+        assert_collaborators(
+            buffer.collaborators(),
+            &[client_a.user_id(), client_b.user_id()],
+        );
+    });
 
-    assert_eq!(channel_buffer_text(&buffer_b, cx_b), "hello, cruel world");
+    // Client B sees the correct text, and then edits it
+    let buffer_b = channel_buffer_b.read_with(cx_b, |buffer, _| buffer.buffer());
+    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, cruel world");
+    buffer_b.update(cx_b, |buffer, cx| {
+        buffer.edit([(7..12, "beautiful")], None, cx)
+    });
 
-    edit_channel_buffer(&buffer_b, cx_b, [(7..12, "beautiful")]);
+    // Both A and B see the new edit
+    deterministic.run_until_parked();
+    assert_eq!(buffer_text(&buffer_a, cx_a), "hello, beautiful world");
+    assert_eq!(buffer_text(&buffer_b, cx_b), "hello, beautiful world");
 
+    // Client A closes the channel buffer.
+    cx_a.update(|_| drop(channel_buffer_a));
     deterministic.run_until_parked();
 
-    assert_eq!(
-        channel_buffer_text(&buffer_a, cx_a),
-        "hello, beautiful world"
-    );
-    assert_eq!(
-        channel_buffer_text(&buffer_b, cx_b),
-        "hello, beautiful world"
-    );
-}
+    // Client B sees that client A is gone from the channel buffer.
+    channel_buffer_b.read_with(cx_b, |buffer, _| {
+        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
+    });
 
-fn edit_channel_buffer<I>(
-    channel_buffer: &ModelHandle<language::Buffer>,
-    cx: &mut TestAppContext,
-    edits: I,
-) where
-    I: IntoIterator<Item = (Range<usize>, &'static str)>,
-{
-    channel_buffer.update(cx, |buffer, cx| buffer.edit(edits, None, cx));
+    // Client A rejoins the channel buffer
+    let _channel_buffer_a = cx_a
+        .update(|cx| ChannelBuffer::join_channel(zed_id, client_a.client().to_owned(), cx))
+        .await
+        .unwrap();
+    deterministic.run_until_parked();
+
+    // Sanity test, make sure we saw A rejoining
+    channel_buffer_b.read_with(cx_b, |buffer, _| {
+        assert_collaborators(
+            &buffer.collaborators(),
+            &[client_b.user_id(), client_a.user_id()],
+        );
+    });
+
+    // Client A loses connection.
+    server.forbid_connections();
+    server.disconnect_client(client_a.peer_id().unwrap());
+    deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
+
+    // Client B observes A disconnect
+    channel_buffer_b.read_with(cx_b, |buffer, _| {
+        assert_collaborators(&buffer.collaborators(), &[client_b.user_id()]);
+    });
+
+    // TODO:
+    // - Test synchronizing offline updates, what happens to A's channel buffer?
 }
 
-fn undo_channel_buffer(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) {
-    channel_buffer.update(cx, |buffer, cx| buffer.undo(cx));
+#[track_caller]
+fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option<UserId>]) {
+    assert_eq!(
+        collaborators
+            .into_iter()
+            .map(|collaborator| collaborator.user_id)
+            .collect::<Vec<_>>(),
+        ids.into_iter().map(|id| id.unwrap()).collect::<Vec<_>>()
+    );
 }
 
-fn channel_buffer_text(
-    channel_buffer: &ModelHandle<language::Buffer>,
-    cx: &mut TestAppContext,
-) -> String {
+fn buffer_text(channel_buffer: &ModelHandle<language::Buffer>, cx: &mut TestAppContext) -> String {
     channel_buffer.read_with(cx, |buffer, _| buffer.text())
 }

crates/rpc/proto/zed.proto 🔗

@@ -147,6 +147,8 @@ message Envelope {
         JoinChannelBufferResponse join_channel_buffer_response = 132;
         UpdateChannelBuffer update_channel_buffer = 133;
         LeaveChannelBuffer leave_channel_buffer = 134;
+        AddChannelBufferCollaborator add_channel_buffer_collaborator = 135;
+        RemoveChannelBufferCollaborator remove_channel_buffer_collaborator = 136;
     }
 }
 
@@ -416,6 +418,16 @@ message RemoveProjectCollaborator {
     PeerId peer_id = 2;
 }
 
+message AddChannelBufferCollaborator {
+    uint64 channel_id = 1;
+    Collaborator collaborator = 2;
+}
+
+message RemoveChannelBufferCollaborator {
+    uint64 channel_id = 1;
+    PeerId peer_id = 2;
+}
+
 message GetDefinition {
      uint64 project_id = 1;
      uint64 buffer_id = 2;
@@ -546,8 +558,8 @@ message UpdateBuffer {
 }
 
 message UpdateChannelBuffer {
-    uint64 buffer_id = 2;
-    repeated Operation operations = 3;
+    uint64 channel_id = 1;
+    repeated Operation operations = 2;
 }
 
 message UpdateBufferFile {
@@ -964,9 +976,10 @@ message JoinChannelBuffer {
 
 message JoinChannelBufferResponse {
     uint64 buffer_id = 1;
-    string base_text = 2;
-    repeated Operation operations = 3;
-    repeated Collaborator collaborators = 4;
+    uint32 replica_id = 2;
+    string base_text = 3;
+    repeated Operation operations = 4;
+    repeated Collaborator collaborators = 5;
 }
 
 message LeaveChannelBuffer {

crates/rpc/src/proto.rs 🔗

@@ -252,7 +252,9 @@ messages!(
     (JoinChannelBuffer, Foreground),
     (JoinChannelBufferResponse, Foreground),
     (LeaveChannelBuffer, Background),
-    (UpdateChannelBuffer, Foreground)
+    (UpdateChannelBuffer, Foreground),
+    (RemoveChannelBufferCollaborator, Foreground),
+    (AddChannelBufferCollaborator, Foreground),
 );
 
 request_messages!(
@@ -376,7 +378,12 @@ entity_messages!(
     UpdateDiffBase
 );
 
-entity_messages!(buffer_id, UpdateChannelBuffer);
+entity_messages!(
+    channel_id,
+    UpdateChannelBuffer,
+    RemoveChannelBufferCollaborator,
+    AddChannelBufferCollaborator
+);
 
 const KIB: usize = 1024;
 const MIB: usize = KIB * 1024;

crates/zed/src/main.rs 🔗

@@ -158,6 +158,7 @@ fn main() {
         outline::init(cx);
         project_symbols::init(cx);
         project_panel::init(Assets, cx);
+        channel::init(&client);
         diagnostics::init(cx);
         search::init(cx);
         semantic_index::init(fs.clone(), http.clone(), languages.clone(), cx);