Implement `Room::set_location`

Antonio Scandurra created

Change summary

crates/call/src/call.rs                |   1 
crates/call/src/participant.rs         |   1 
crates/call/src/room.rs                |  37 ++++++
crates/collab/src/integration_tests.rs | 162 +++++++++++++++++++++++++++
crates/collab/src/rpc.rs               |  18 +++
crates/collab/src/rpc/store.rs         |  31 +++++
crates/rpc/proto/zed.proto             |   6 +
crates/rpc/src/proto.rs                |   2 
8 files changed, 256 insertions(+), 2 deletions(-)

Detailed changes

crates/call/src/call.rs 🔗

@@ -4,6 +4,7 @@ pub mod room;
 use anyhow::{anyhow, Result};
 use client::{incoming_call::IncomingCall, Client, UserStore};
 use gpui::{Entity, ModelContext, ModelHandle, MutableAppContext, Task};
+pub use participant::ParticipantLocation;
 pub use room::Room;
 use std::sync::Arc;
 

crates/call/src/participant.rs 🔗

@@ -2,6 +2,7 @@ use anyhow::{anyhow, Result};
 use client::{proto, User};
 use std::sync::Arc;
 
+#[derive(Copy, Clone, Debug, Eq, PartialEq)]
 pub enum ParticipantLocation {
     Project { project_id: u64 },
     External,

crates/call/src/room.rs 🔗

@@ -4,6 +4,7 @@ use client::{incoming_call::IncomingCall, proto, Client, PeerId, TypedEnvelope,
 use collections::{HashMap, HashSet};
 use futures::StreamExt;
 use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
+use project::Project;
 use std::sync::Arc;
 use util::ResultExt;
 
@@ -233,6 +234,42 @@ impl Room {
             Ok(())
         })
     }
+
+    pub fn set_location(
+        &mut self,
+        project: Option<&ModelHandle<Project>>,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        if self.status.is_offline() {
+            return Task::ready(Err(anyhow!("room is offline")));
+        }
+
+        let client = self.client.clone();
+        let room_id = self.id;
+        let location = if let Some(project) = project {
+            if let Some(project_id) = project.read(cx).remote_id() {
+                proto::participant_location::Variant::Project(
+                    proto::participant_location::Project { id: project_id },
+                )
+            } else {
+                return Task::ready(Err(anyhow!("project is not shared")));
+            }
+        } else {
+            proto::participant_location::Variant::External(proto::participant_location::External {})
+        };
+
+        cx.foreground().spawn(async move {
+            client
+                .request(proto::UpdateParticipantLocation {
+                    room_id,
+                    location: Some(proto::ParticipantLocation {
+                        variant: Some(location),
+                    }),
+                })
+                .await?;
+            Ok(())
+        })
+    }
 }
 
 #[derive(Copy, Clone, PartialEq, Eq)]

crates/collab/src/integration_tests.rs 🔗

@@ -5,7 +5,7 @@ use crate::{
 };
 use ::rpc::Peer;
 use anyhow::anyhow;
-use call::{room, Room};
+use call::{room, ParticipantLocation, Room};
 use client::{
     self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Connection,
     Credentials, EstablishConnectionError, User, UserStore, RECEIVE_TIMEOUT,
@@ -40,7 +40,7 @@ use serde_json::json;
 use settings::{Formatter, Settings};
 use sqlx::types::time::OffsetDateTime;
 use std::{
-    cell::RefCell,
+    cell::{Cell, RefCell},
     env, mem,
     ops::Deref,
     path::{Path, PathBuf},
@@ -637,6 +637,164 @@ async fn test_room_events(
     }
 }
 
+#[gpui::test(iterations = 10)]
+async fn test_room_location(
+    deterministic: Arc<Deterministic>,
+    cx_a: &mut TestAppContext,
+    cx_b: &mut TestAppContext,
+) {
+    deterministic.forbid_parking();
+    let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
+    let client_a = server.create_client(cx_a, "user_a").await;
+    let client_b = server.create_client(cx_b, "user_b").await;
+    client_a.fs.insert_tree("/a", json!({})).await;
+    client_b.fs.insert_tree("/b", json!({})).await;
+
+    let (project_a, _) = client_a.build_local_project("/a", cx_a).await;
+    let (project_b, _) = client_b.build_local_project("/b", cx_b).await;
+
+    let (room_id, mut rooms) = server
+        .create_rooms(&mut [(&client_a, cx_a), (&client_b, cx_b)])
+        .await;
+
+    let room_a = rooms.remove(0);
+    let room_a_notified = Rc::new(Cell::new(false));
+    cx_a.update({
+        let room_a_notified = room_a_notified.clone();
+        |cx| {
+            cx.observe(&room_a, move |_, _| room_a_notified.set(true))
+                .detach()
+        }
+    });
+
+    let room_b = rooms.remove(0);
+    let room_b_notified = Rc::new(Cell::new(false));
+    cx_b.update({
+        let room_b_notified = room_b_notified.clone();
+        |cx| {
+            cx.observe(&room_b, move |_, _| room_b_notified.set(true))
+                .detach()
+        }
+    });
+
+    let project_a_id = project_a
+        .update(cx_a, |project, cx| project.share(room_id, cx))
+        .await
+        .unwrap();
+    deterministic.run_until_parked();
+    assert!(room_a_notified.take());
+    assert_eq!(
+        participant_locations(&room_a, cx_a),
+        vec![("user_b".to_string(), ParticipantLocation::External)]
+    );
+    assert!(room_b_notified.take());
+    assert_eq!(
+        participant_locations(&room_b, cx_b),
+        vec![("user_a".to_string(), ParticipantLocation::External)]
+    );
+
+    let project_b_id = project_b
+        .update(cx_b, |project, cx| project.share(room_id, cx))
+        .await
+        .unwrap();
+    deterministic.run_until_parked();
+    assert!(room_a_notified.take());
+    assert_eq!(
+        participant_locations(&room_a, cx_a),
+        vec![("user_b".to_string(), ParticipantLocation::External)]
+    );
+    assert!(room_b_notified.take());
+    assert_eq!(
+        participant_locations(&room_b, cx_b),
+        vec![("user_a".to_string(), ParticipantLocation::External)]
+    );
+
+    room_a
+        .update(cx_a, |room, cx| room.set_location(Some(&project_a), cx))
+        .await
+        .unwrap();
+    deterministic.run_until_parked();
+    assert!(room_a_notified.take());
+    assert_eq!(
+        participant_locations(&room_a, cx_a),
+        vec![("user_b".to_string(), ParticipantLocation::External)]
+    );
+    assert!(room_b_notified.take());
+    assert_eq!(
+        participant_locations(&room_b, cx_b),
+        vec![(
+            "user_a".to_string(),
+            ParticipantLocation::Project {
+                project_id: project_a_id
+            }
+        )]
+    );
+
+    room_b
+        .update(cx_b, |room, cx| room.set_location(Some(&project_b), cx))
+        .await
+        .unwrap();
+    deterministic.run_until_parked();
+    assert!(room_a_notified.take());
+    assert_eq!(
+        participant_locations(&room_a, cx_a),
+        vec![(
+            "user_b".to_string(),
+            ParticipantLocation::Project {
+                project_id: project_b_id
+            }
+        )]
+    );
+    assert!(room_b_notified.take());
+    assert_eq!(
+        participant_locations(&room_b, cx_b),
+        vec![(
+            "user_a".to_string(),
+            ParticipantLocation::Project {
+                project_id: project_a_id
+            }
+        )]
+    );
+
+    room_b
+        .update(cx_b, |room, cx| room.set_location(None, cx))
+        .await
+        .unwrap();
+    deterministic.run_until_parked();
+    assert!(room_a_notified.take());
+    assert_eq!(
+        participant_locations(&room_a, cx_a),
+        vec![("user_b".to_string(), ParticipantLocation::External)]
+    );
+    assert!(room_b_notified.take());
+    assert_eq!(
+        participant_locations(&room_b, cx_b),
+        vec![(
+            "user_a".to_string(),
+            ParticipantLocation::Project {
+                project_id: project_a_id
+            }
+        )]
+    );
+
+    fn participant_locations(
+        room: &ModelHandle<Room>,
+        cx: &TestAppContext,
+    ) -> Vec<(String, ParticipantLocation)> {
+        room.read_with(cx, |room, _| {
+            room.remote_participants()
+                .values()
+                .map(|participant| {
+                    (
+                        participant.user.github_login.to_string(),
+                        participant.location,
+                    )
+                })
+                .collect()
+        })
+    }
+}
+
 #[gpui::test(iterations = 10)]
 async fn test_propagate_saves_and_fs_changes(
     cx_a: &mut TestAppContext,

crates/collab/src/rpc.rs 🔗

@@ -151,6 +151,7 @@ impl Server {
             .add_message_handler(Server::leave_room)
             .add_request_handler(Server::call)
             .add_message_handler(Server::decline_call)
+            .add_request_handler(Server::update_participant_location)
             .add_request_handler(Server::share_project)
             .add_message_handler(Server::unshare_project)
             .add_request_handler(Server::join_project)
@@ -719,6 +720,23 @@ impl Server {
         Ok(())
     }
 
+    async fn update_participant_location(
+        self: Arc<Server>,
+        request: TypedEnvelope<proto::UpdateParticipantLocation>,
+        response: Response<proto::UpdateParticipantLocation>,
+    ) -> Result<()> {
+        let room_id = request.payload.room_id;
+        let location = request
+            .payload
+            .location
+            .ok_or_else(|| anyhow!("invalid location"))?;
+        let mut store = self.store().await;
+        let room = store.update_participant_location(room_id, location, request.sender_id)?;
+        self.room_updated(room);
+        response.send(proto::Ack {})?;
+        Ok(())
+    }
+
     fn room_updated(&self, room: &proto::Room) {
         for participant in &room.participants {
             self.peer

crates/collab/src/rpc/store.rs 🔗

@@ -585,6 +585,37 @@ impl Store {
         }
     }
 
+    pub fn update_participant_location(
+        &mut self,
+        room_id: RoomId,
+        location: proto::ParticipantLocation,
+        connection_id: ConnectionId,
+    ) -> Result<&proto::Room> {
+        let room = self
+            .rooms
+            .get_mut(&room_id)
+            .ok_or_else(|| anyhow!("no such room"))?;
+        if let Some(proto::participant_location::Variant::Project(project)) =
+            location.variant.as_ref()
+        {
+            anyhow::ensure!(
+                room.participants
+                    .iter()
+                    .any(|participant| participant.project_ids.contains(&project.id)),
+                "no such project"
+            );
+        }
+
+        let participant = room
+            .participants
+            .iter_mut()
+            .find(|participant| participant.peer_id == connection_id.0)
+            .ok_or_else(|| anyhow!("no such room"))?;
+        participant.location = Some(location);
+
+        Ok(room)
+    }
+
     pub fn share_project(
         &mut self,
         room_id: RoomId,

crates/rpc/proto/zed.proto 🔗

@@ -20,6 +20,7 @@ message Envelope {
         IncomingCall incoming_call = 1000;
         CancelCall cancel_call = 1001;
         DeclineCall decline_call = 13;
+        UpdateParticipantLocation update_participant_location = 1003;
         RoomUpdated room_updated = 14;
 
         ShareProject share_project = 15;
@@ -190,6 +191,11 @@ message CancelCall {}
 
 message DeclineCall {}
 
+message UpdateParticipantLocation {
+    uint64 room_id = 1;
+    ParticipantLocation location = 2;
+}
+
 message RoomUpdated {
     Room room = 1;
 }

crates/rpc/src/proto.rs 🔗

@@ -170,6 +170,7 @@ messages!(
     (UpdateFollowers, Foreground),
     (UpdateInviteInfo, Foreground),
     (UpdateLanguageServer, Foreground),
+    (UpdateParticipantLocation, Foreground),
     (UpdateProject, Foreground),
     (UpdateWorktree, Foreground),
     (UpdateWorktreeExtensions, Background),
@@ -222,6 +223,7 @@ request_messages!(
     (ShareProject, ShareProjectResponse),
     (Test, Test),
     (UpdateBuffer, Ack),
+    (UpdateParticipantLocation, Ack),
     (UpdateWorktree, Ack),
 );