Start work on showing consistent replica ids for channel buffers

Max Brunsfeld and Mikayla created

Co-authored-by: Mikayla <mikayla@zed.dev>

Change summary

Cargo.lock                                      |   1 
crates/channel/src/channel_buffer.rs            |   4 
crates/collab/Cargo.toml                        |   1 
crates/collab/src/tests/channel_buffer_tests.rs | 136 ++++++++++++++++++
crates/collab_ui/src/channel_view.rs            |  43 +++++
crates/collab_ui/src/collab_panel.rs            |   9 +
crates/editor/src/editor.rs                     |  11 +
crates/project/src/project.rs                   |   4 
8 files changed, 202 insertions(+), 7 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1459,6 +1459,7 @@ dependencies = [
  "clap 3.2.25",
  "client",
  "clock",
+ "collab_ui",
  "collections",
  "ctor",
  "dashmap",

crates/channel/src/channel_buffer.rs 🔗

@@ -171,4 +171,8 @@ impl ChannelBuffer {
             .channel_for_id(self.channel_id)
             .cloned()
     }
+
+    pub fn replica_id(&self, cx: &AppContext) -> u16 {
+        self.buffer.read(cx).replica_id()
+    }
 }

crates/collab/Cargo.toml 🔗

@@ -78,6 +78,7 @@ rpc = { path = "../rpc", features = ["test-support"] }
 settings = { path = "../settings", features = ["test-support"] }
 theme = { path = "../theme" }
 workspace = { path = "../workspace", features = ["test-support"] }
+collab_ui = { path = "../collab_ui", features = ["test-support"] }
 
 ctor.workspace = true
 env_logger.workspace = true

crates/collab/src/tests/channel_buffer_tests.rs 🔗

@@ -1,8 +1,11 @@
 use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer};
-
+use call::ActiveCall;
 use client::UserId;
+use collab_ui::channel_view::ChannelView;
+use collections::HashMap;
 use gpui::{executor::Deterministic, ModelHandle, TestAppContext};
 use rpc::{proto, RECEIVE_TIMEOUT};
+use serde_json::json;
 use std::sync::Arc;
 
 #[gpui::test]
@@ -82,7 +85,9 @@ async fn test_core_channel_buffers(
     // Client A rejoins the channel buffer
     let _channel_buffer_a = client_a
         .channel_store()
-        .update(cx_a, |channels, cx| channels.open_channel_buffer(zed_id, cx))
+        .update(cx_a, |channels, cx| {
+            channels.open_channel_buffer(zed_id, cx)
+        })
         .await
         .unwrap();
     deterministic.run_until_parked();
@@ -110,6 +115,133 @@ async fn test_core_channel_buffers(
     // - Test interaction with channel deletion while buffer is open
 }
 
+#[gpui::test]
+async fn test_channel_buffer_replica_ids(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+    cx_c: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(&deterministic).await;
+    let client_a = server.create_client(cx_a, "user_a").await;
+    let client_b = server.create_client(cx_b, "user_b").await;
+    let client_c = server.create_client(cx_c, "user_c").await;
+
+    let channel_id = server
+        .make_channel(
+            "zed",
+            (&client_a, cx_a),
+            &mut [(&client_b, cx_b), (&client_c, cx_c)],
+        )
+        .await;
+
+    let active_call_a = cx_a.read(ActiveCall::global);
+    let active_call_b = cx_b.read(ActiveCall::global);
+
+    // Clients A and B join a channel.
+    active_call_a
+        .update(cx_a, |call, cx| call.join_channel(channel_id, cx))
+        .await
+        .unwrap();
+    active_call_b
+        .update(cx_b, |call, cx| call.join_channel(channel_id, cx))
+        .await
+        .unwrap();
+
+    // Clients A, B, and C join a channel buffer
+    // C first so that the replica IDs in the project and the channel buffer are different
+    let channel_buffer_c = client_c
+        .channel_store()
+        .update(cx_c, |channel, cx| {
+            channel.open_channel_buffer(channel_id, cx)
+        })
+        .await
+        .unwrap();
+    let channel_buffer_b = client_b
+        .channel_store()
+        .update(cx_b, |channel, cx| {
+            channel.open_channel_buffer(channel_id, cx)
+        })
+        .await
+        .unwrap();
+    let channel_buffer_a = client_a
+        .channel_store()
+        .update(cx_a, |channel, cx| {
+            channel.open_channel_buffer(channel_id, cx)
+        })
+        .await
+        .unwrap();
+
+    // Client B shares a project
+    client_b
+        .fs()
+        .insert_tree("/dir", json!({ "file.txt": "contents" }))
+        .await;
+    let (project_b, _) = client_b.build_local_project("/dir", cx_b).await;
+    let shared_project_id = active_call_b
+        .update(cx_b, |call, cx| call.share_project(project_b.clone(), cx))
+        .await
+        .unwrap();
+
+    // Client A joins the project
+    let project_a = client_a.build_remote_project(shared_project_id, cx_a).await;
+    deterministic.run_until_parked();
+
+    // Client C is in a separate project.
+    client_c.fs().insert_tree("/dir", json!({})).await;
+    let (project_c, _) = client_c.build_local_project("/dir", cx_c).await;
+
+    // Note that each user has a different replica id in the projects vs the
+    // channel buffer.
+    channel_buffer_a.read_with(cx_a, |channel_buffer, cx| {
+        assert_eq!(project_a.read(cx).replica_id(), 1);
+        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 2);
+    });
+    channel_buffer_b.read_with(cx_b, |channel_buffer, cx| {
+        assert_eq!(project_b.read(cx).replica_id(), 0);
+        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 1);
+    });
+    channel_buffer_c.read_with(cx_c, |channel_buffer, cx| {
+        // C is not in the project
+        assert_eq!(channel_buffer.buffer().read(cx).replica_id(), 0);
+    });
+
+    let channel_window_a = cx_a
+        .add_window(|cx| ChannelView::new(project_a.clone(), channel_buffer_a.clone(), None, cx));
+    let channel_window_b = cx_b
+        .add_window(|cx| ChannelView::new(project_b.clone(), channel_buffer_b.clone(), None, cx));
+    let channel_window_c = cx_c
+        .add_window(|cx| ChannelView::new(project_c.clone(), channel_buffer_c.clone(), None, cx));
+
+    let channel_view_a = channel_window_a.root(cx_a);
+    let channel_view_b = channel_window_b.root(cx_b);
+    let channel_view_c = channel_window_c.root(cx_c);
+
+    // For clients A and B, the replica ids in the channel buffer are mapped
+    // so that they match the same users' replica ids in their shared project.
+    channel_view_a.read_with(cx_a, |view, cx| {
+        assert_eq!(
+            view.project_replica_ids_by_channel_buffer_replica_id(cx),
+            [(1, 0), (2, 1)].into_iter().collect::<HashMap<_, _>>()
+        );
+    });
+    channel_view_b.read_with(cx_b, |view, cx| {
+        assert_eq!(
+            view.project_replica_ids_by_channel_buffer_replica_id(cx),
+            [(1, 0), (2, 1)].into_iter().collect::<HashMap<u16, u16>>(),
+        )
+    });
+
+    // Client C only sees themself, as they're not part of any shared project
+    channel_view_c.read_with(cx_c, |view, cx| {
+        assert_eq!(
+            view.project_replica_ids_by_channel_buffer_replica_id(cx),
+            [(0, 0)].into_iter().collect::<HashMap<u16, u16>>(),
+        );
+    });
+}
+
 #[track_caller]
 fn assert_collaborators(collaborators: &[proto::Collaborator], ids: &[Option<UserId>]) {
     assert_eq!(

crates/collab_ui/src/channel_view.rs 🔗

@@ -1,4 +1,6 @@
 use channel::channel_buffer::ChannelBuffer;
+use clock::ReplicaId;
+use collections::HashMap;
 use editor::Editor;
 use gpui::{
     actions,
@@ -6,6 +8,7 @@ use gpui::{
     AnyElement, AppContext, Element, Entity, ModelHandle, View, ViewContext, ViewHandle,
 };
 use language::Language;
+use project::Project;
 use std::sync::Arc;
 use workspace::item::{Item, ItemHandle};
 
@@ -17,22 +20,56 @@ pub(crate) fn init(cx: &mut AppContext) {
 
 pub struct ChannelView {
     editor: ViewHandle<Editor>,
+    project: ModelHandle<Project>,
     channel_buffer: ModelHandle<ChannelBuffer>,
 }
 
 impl ChannelView {
     pub fn new(
+        project: ModelHandle<Project>,
         channel_buffer: ModelHandle<ChannelBuffer>,
-        language: Arc<Language>,
+        language: Option<Arc<Language>>,
         cx: &mut ViewContext<Self>,
     ) -> Self {
         let buffer = channel_buffer.read(cx).buffer();
-        buffer.update(cx, |buffer, cx| buffer.set_language(Some(language), cx));
+        buffer.update(cx, |buffer, cx| buffer.set_language(language, cx));
         let editor = cx.add_view(|cx| Editor::for_buffer(buffer, None, cx));
-        Self {
+        let this = Self {
             editor,
+            project,
             channel_buffer,
+        };
+        let mapping = this.project_replica_ids_by_channel_buffer_replica_id(cx);
+        this.editor
+            .update(cx, |editor, cx| editor.set_replica_id_mapping(mapping, cx));
+        this
+    }
+
+    /// Channel Buffer Replica ID -> Project Replica ID
+    pub fn project_replica_ids_by_channel_buffer_replica_id(
+        &self,
+        cx: &AppContext,
+    ) -> HashMap<ReplicaId, ReplicaId> {
+        let project = self.project.read(cx);
+        let mut result = HashMap::default();
+        result.insert(
+            self.channel_buffer.read(cx).replica_id(cx),
+            project.replica_id(),
+        );
+        for collaborator in self.channel_buffer.read(cx).collaborators() {
+            let project_replica_id =
+                project
+                    .collaborators()
+                    .values()
+                    .find_map(|project_collaborator| {
+                        (project_collaborator.user_id == collaborator.user_id)
+                            .then_some(project_collaborator.replica_id)
+                    });
+            if let Some(project_replica_id) = project_replica_id {
+                result.insert(collaborator.replica_id as ReplicaId, project_replica_id);
+            }
         }
+        result
     }
 }
 

crates/collab_ui/src/collab_panel.rs 🔗

@@ -2238,7 +2238,14 @@ impl CollabPanel {
                 .await?;
 
             workspace.update(&mut cx, |workspace, cx| {
-                let channel_view = cx.add_view(|cx| ChannelView::new(channel_buffer, markdown, cx));
+                let channel_view = cx.add_view(|cx| {
+                    ChannelView::new(
+                        workspace.project().to_owned(),
+                        channel_buffer,
+                        Some(markdown),
+                        cx,
+                    )
+                });
                 workspace.add_item(Box::new(channel_view), cx);
             })?;
 

crates/editor/src/editor.rs 🔗

@@ -559,6 +559,7 @@ pub struct Editor {
     blink_manager: ModelHandle<BlinkManager>,
     show_local_selections: bool,
     mode: EditorMode,
+    replica_id_mapping: Option<HashMap<ReplicaId, ReplicaId>>,
     show_gutter: bool,
     show_wrap_guides: Option<bool>,
     placeholder_text: Option<Arc<str>>,
@@ -1394,6 +1395,7 @@ impl Editor {
             blink_manager: blink_manager.clone(),
             show_local_selections: true,
             mode,
+            replica_id_mapping: None,
             show_gutter: mode == EditorMode::Full,
             show_wrap_guides: None,
             placeholder_text: None,
@@ -1604,6 +1606,15 @@ impl Editor {
         self.read_only = read_only;
     }
 
+    pub fn set_replica_id_mapping(
+        &mut self,
+        mapping: HashMap<ReplicaId, ReplicaId>,
+        cx: &mut ViewContext<Self>,
+    ) {
+        self.replica_id_mapping = Some(mapping);
+        cx.notify();
+    }
+
     fn selections_did_change(
         &mut self,
         local: bool,

crates/project/src/project.rs 🔗

@@ -11,7 +11,7 @@ mod project_tests;
 mod worktree_tests;
 
 use anyhow::{anyhow, Context, Result};
-use client::{proto, Client, TypedEnvelope, UserStore};
+use client::{proto, Client, TypedEnvelope, UserId, UserStore};
 use clock::ReplicaId;
 use collections::{hash_map, BTreeMap, HashMap, HashSet};
 use copilot::Copilot;
@@ -250,6 +250,7 @@ enum ProjectClientState {
 pub struct Collaborator {
     pub peer_id: proto::PeerId,
     pub replica_id: ReplicaId,
+    pub user_id: UserId,
 }
 
 #[derive(Clone, Debug, PartialEq)]
@@ -7756,6 +7757,7 @@ impl Collaborator {
         Ok(Self {
             peer_id: message.peer_id.ok_or_else(|| anyhow!("invalid peer id"))?,
             replica_id: message.replica_id as ReplicaId,
+            user_id: message.user_id as UserId,
         })
     }
 }