Add room creation from channel join

Mikayla Maki and max created

co-authored-by: max <max@zed.dev>

Change summary

crates/call/src/call.rs                  |  98 ++++---------------
crates/call/src/room.rs                  |  59 ++++++++++-
crates/collab/src/db.rs                  |  27 ++++-
crates/collab/src/db/tests.rs            |  20 ++-
crates/collab/src/rpc.rs                 | 126 +++++++++++++++----------
crates/collab/src/tests/channel_tests.rs |  78 +++++++++------
crates/rpc/src/proto.rs                  |   2 
7 files changed, 229 insertions(+), 181 deletions(-)

Detailed changes

crates/call/src/call.rs 🔗

@@ -209,80 +209,6 @@ impl ActiveCall {
         })
     }
 
-    pub fn join_channel(
-        &mut self,
-        channel_id: u64,
-        cx: &mut ModelContext<Self>,
-    ) -> Task<Result<()>> {
-        let room = if let Some(room) = self.room().cloned() {
-            Some(Task::ready(Ok(room)).shared())
-        } else {
-            self.pending_room_creation.clone()
-        };
-
-        todo!()
-        // let invite = if let Some(room) = room {
-        //     cx.spawn_weak(|_, mut cx| async move {
-        //         let room = room.await.map_err(|err| anyhow!("{:?}", err))?;
-
-        //         // TODO join_channel:
-        //         // let initial_project_id = if let Some(initial_project) = initial_project {
-        //         //     Some(
-        //         //         room.update(&mut cx, |room, cx| room.share_project(initial_project, cx))
-        //         //             .await?,
-        //         //     )
-        //         // } else {
-        //         //     None
-        //         // };
-
-        //         // room.update(&mut cx, |room, cx| {
-        //         //     room.call(called_user_id, initial_project_id, cx)
-        //         // })
-        //         // .await?;
-
-        //         anyhow::Ok(())
-        //     })
-        // } else {
-        //     let client = self.client.clone();
-        //     let user_store = self.user_store.clone();
-        //     let room = cx
-        //         .spawn(|this, mut cx| async move {
-        //             let create_room = async {
-        //                 let room = cx
-        //                     .update(|cx| {
-        //                         Room::create_from_channel(channel_id, client, user_store, cx)
-        //                     })
-        //                     .await?;
-
-        //                 this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
-        //                     .await?;
-
-        //                 anyhow::Ok(room)
-        //             };
-
-        //             let room = create_room.await;
-        //             this.update(&mut cx, |this, _| this.pending_room_creation = None);
-        //             room.map_err(Arc::new)
-        //         })
-        //         .shared();
-        //     self.pending_room_creation = Some(room.clone());
-        //     cx.foreground().spawn(async move {
-        //         room.await.map_err(|err| anyhow!("{:?}", err))?;
-        //         anyhow::Ok(())
-        //     })
-        // };
-
-        // cx.spawn(|this, mut cx| async move {
-        //     let result = invite.await;
-        //     this.update(&mut cx, |this, cx| {
-        //         this.pending_invites.remove(&called_user_id);
-        //         this.report_call_event("invite", cx);
-        //         cx.notify();
-        //     });
-        //     result
-        // })
-    }
-
     pub fn cancel_invite(
         &mut self,
         called_user_id: u64,
@@ -348,6 +274,30 @@ impl ActiveCall {
         Ok(())
     }
 
+    pub fn join_channel(
+        &mut self,
+        channel_id: u64,
+        cx: &mut ModelContext<Self>,
+    ) -> Task<Result<()>> {
+        if let Some(room) = self.room().cloned() {
+            if room.read(cx).channel_id() == Some(channel_id) {
+                return Task::ready(Ok(()));
+            }
+        }
+
+        let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx);
+
+        cx.spawn(|this, mut cx| async move {
+            let room = join.await?;
+            this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx))
+                .await?;
+            this.update(&mut cx, |this, cx| {
+                this.report_call_event("join channel", cx)
+            });
+            Ok(())
+        })
+    }
+
     pub fn hang_up(&mut self, cx: &mut ModelContext<Self>) -> Task<Result<()>> {
         cx.notify();
         self.report_call_event("hang up", cx);

crates/call/src/room.rs 🔗

@@ -49,6 +49,7 @@ pub enum Event {
 
 pub struct Room {
     id: u64,
+    channel_id: Option<u64>,
     live_kit: Option<LiveKitRoom>,
     status: RoomStatus,
     shared_projects: HashSet<WeakModelHandle<Project>>,
@@ -93,8 +94,25 @@ impl Entity for Room {
 }
 
 impl Room {
+    pub fn channel_id(&self) -> Option<u64> {
+        self.channel_id
+    }
+
+    #[cfg(any(test, feature = "test-support"))]
+    pub fn is_connected(&self) -> bool {
+        if let Some(live_kit) = self.live_kit.as_ref() {
+            matches!(
+                *live_kit.room.status().borrow(),
+                live_kit_client::ConnectionState::Connected { .. }
+            )
+        } else {
+            false
+        }
+    }
+
     fn new(
         id: u64,
+        channel_id: Option<u64>,
         live_kit_connection_info: Option<proto::LiveKitConnectionInfo>,
         client: Arc<Client>,
         user_store: ModelHandle<UserStore>,
@@ -185,6 +203,7 @@ impl Room {
 
         Self {
             id,
+            channel_id,
             live_kit: live_kit_room,
             status: RoomStatus::Online,
             shared_projects: Default::default(),
@@ -204,15 +223,6 @@ impl Room {
         }
     }
 
-    pub(crate) fn create_from_channel(
-        channel_id: u64,
-        client: Arc<Client>,
-        user_store: ModelHandle<UserStore>,
-        cx: &mut AppContext,
-    ) -> Task<Result<ModelHandle<Self>>> {
-        todo!()
-    }
-
     pub(crate) fn create(
         called_user_id: u64,
         initial_project: Option<ModelHandle<Project>>,
@@ -226,6 +236,7 @@ impl Room {
             let room = cx.add_model(|cx| {
                 Self::new(
                     room_proto.id,
+                    None,
                     response.live_kit_connection_info,
                     client,
                     user_store,
@@ -257,6 +268,35 @@ impl Room {
         })
     }
 
+    pub(crate) fn join_channel(
+        channel_id: u64,
+        client: Arc<Client>,
+        user_store: ModelHandle<UserStore>,
+        cx: &mut AppContext,
+    ) -> Task<Result<ModelHandle<Self>>> {
+        cx.spawn(|mut cx| async move {
+            let response = client.request(proto::JoinChannel { channel_id }).await?;
+            let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?;
+            let room = cx.add_model(|cx| {
+                Self::new(
+                    room_proto.id,
+                    Some(channel_id),
+                    response.live_kit_connection_info,
+                    client,
+                    user_store,
+                    cx,
+                )
+            });
+
+            room.update(&mut cx, |room, cx| {
+                room.apply_room_update(room_proto, cx)?;
+                anyhow::Ok(())
+            })?;
+
+            Ok(room)
+        })
+    }
+
     pub(crate) fn join(
         call: &IncomingCall,
         client: Arc<Client>,
@@ -270,6 +310,7 @@ impl Room {
             let room = cx.add_model(|cx| {
                 Self::new(
                     room_id,
+                    None,
                     response.live_kit_connection_info,
                     client,
                     user_store,

crates/collab/src/db.rs 🔗

@@ -1833,14 +1833,21 @@ impl Database {
                     .await?;
 
                 let room = self.get_room(room_id, &tx).await?;
-                if room.participants.is_empty() {
-                    room::Entity::delete_by_id(room_id).exec(&*tx).await?;
-                }
+                let deleted = if room.participants.is_empty() {
+                    let result = room::Entity::delete_by_id(room_id)
+                        .filter(room::Column::ChannelId.is_null())
+                        .exec(&*tx)
+                        .await?;
+                    result.rows_affected > 0
+                } else {
+                    false
+                };
 
                 let left_room = LeftRoom {
                     room,
                     left_projects,
                     canceled_calls_to_user_ids,
+                    deleted,
                 };
 
                 if left_room.room.participants.is_empty() {
@@ -3065,14 +3072,21 @@ impl Database {
 
     // channels
 
-    pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result<ChannelId> {
-        self.create_channel(name, None, creator_id).await
+    pub async fn create_root_channel(
+        &self,
+        name: &str,
+        live_kit_room: &str,
+        creator_id: UserId,
+    ) -> Result<ChannelId> {
+        self.create_channel(name, None, live_kit_room, creator_id)
+            .await
     }
 
     pub async fn create_channel(
         &self,
         name: &str,
         parent: Option<ChannelId>,
+        live_kit_room: &str,
         creator_id: UserId,
     ) -> Result<ChannelId> {
         self.transaction(move |tx| async move {
@@ -3106,7 +3120,7 @@ impl Database {
 
             room::ActiveModel {
                 channel_id: ActiveValue::Set(Some(channel.id)),
-                live_kit_room: ActiveValue::Set(format!("channel-{}", channel.id)),
+                live_kit_room: ActiveValue::Set(live_kit_room.to_string()),
                 ..Default::default()
             }
             .insert(&*tx)
@@ -3731,6 +3745,7 @@ pub struct LeftRoom {
     pub room: proto::Room,
     pub left_projects: HashMap<ProjectId, LeftProject>,
     pub canceled_calls_to_user_ids: Vec<UserId>,
+    pub deleted: bool,
 }
 
 pub struct RefreshedRoom {

crates/collab/src/db/tests.rs 🔗

@@ -899,19 +899,22 @@ test_both_dbs!(test_channels_postgres, test_channels_sqlite, db, {
         .unwrap()
         .user_id;
 
-    let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
-    let crdb_id = db.create_channel("crdb", Some(zed_id), a_id).await.unwrap();
+    let zed_id = db.create_root_channel("zed", "1", a_id).await.unwrap();
+    let crdb_id = db
+        .create_channel("crdb", Some(zed_id), "2", a_id)
+        .await
+        .unwrap();
     let livestreaming_id = db
-        .create_channel("livestreaming", Some(zed_id), a_id)
+        .create_channel("livestreaming", Some(zed_id), "3", a_id)
         .await
         .unwrap();
     let replace_id = db
-        .create_channel("replace", Some(zed_id), a_id)
+        .create_channel("replace", Some(zed_id), "4", a_id)
         .await
         .unwrap();
-    let rust_id = db.create_root_channel("rust", a_id).await.unwrap();
+    let rust_id = db.create_root_channel("rust", "5", a_id).await.unwrap();
     let cargo_id = db
-        .create_channel("cargo", Some(rust_id), a_id)
+        .create_channel("cargo", Some(rust_id), "6", a_id)
         .await
         .unwrap();
 
@@ -988,7 +991,10 @@ test_both_dbs!(
             .unwrap()
             .user_id;
 
-        let channel_1 = db.create_root_channel("channel_1", user_1).await.unwrap();
+        let channel_1 = db
+            .create_root_channel("channel_1", "1", user_1)
+            .await
+            .unwrap();
         let room_1 = db.get_channel_room(channel_1).await.unwrap();
 
         // can join a room with membership to its channel

crates/collab/src/rpc.rs 🔗

@@ -186,7 +186,7 @@ impl Server {
 
         server
             .add_request_handler(ping)
-            .add_request_handler(create_room_request)
+            .add_request_handler(create_room)
             .add_request_handler(join_room)
             .add_request_handler(rejoin_room)
             .add_request_handler(leave_room)
@@ -859,12 +859,42 @@ async fn ping(_: proto::Ping, response: Response<proto::Ping>, _session: Session
     Ok(())
 }
 
-async fn create_room_request(
+async fn create_room(
     _request: proto::CreateRoom,
     response: Response<proto::CreateRoom>,
     session: Session,
 ) -> Result<()> {
-    let (room, live_kit_connection_info) = create_room(&session).await?;
+    let live_kit_room = nanoid::nanoid!(30);
+
+    let live_kit_connection_info = {
+        let live_kit_room = live_kit_room.clone();
+        let live_kit = session.live_kit_client.as_ref();
+
+        util::async_iife!({
+            let live_kit = live_kit?;
+
+            live_kit
+                .create_room(live_kit_room.clone())
+                .await
+                .trace_err()?;
+
+            let token = live_kit
+                .room_token(&live_kit_room, &session.user_id.to_string())
+                .trace_err()?;
+
+            Some(proto::LiveKitConnectionInfo {
+                server_url: live_kit.url().into(),
+                token,
+            })
+        })
+    }
+    .await;
+
+    let room = session
+        .db()
+        .await
+        .create_room(session.user_id, session.connection_id, &live_kit_room)
+        .await?;
 
     response.send(proto::CreateRoomResponse {
         room: Some(room.clone()),
@@ -1259,11 +1289,12 @@ async fn update_participant_location(
     let location = request
         .location
         .ok_or_else(|| anyhow!("invalid location"))?;
-    let room = session
-        .db()
-        .await
+
+    let db = session.db().await;
+    let room = db
         .update_room_participant_location(room_id, session.connection_id, location)
         .await?;
+
     room_updated(&room, &session.peer);
     response.send(proto::Ack {})?;
     Ok(())
@@ -2067,10 +2098,17 @@ async fn create_channel(
     session: Session,
 ) -> Result<()> {
     let db = session.db().await;
+    let live_kit_room = format!("channel-{}", nanoid::nanoid!(30));
+
+    if let Some(live_kit) = session.live_kit_client.as_ref() {
+        live_kit.create_room(live_kit_room.clone()).await?;
+    }
+
     let id = db
         .create_channel(
             &request.name,
             request.parent_id.map(|id| ChannelId::from_proto(id)),
+            &live_kit_room,
             session.user_id,
         )
         .await?;
@@ -2160,21 +2198,39 @@ async fn join_channel(
     response: Response<proto::JoinChannel>,
     session: Session,
 ) -> Result<()> {
-    let db = session.db().await;
     let channel_id = ChannelId::from_proto(request.channel_id);
 
-    todo!();
-    // db.check_channel_membership(session.user_id, channel_id)
-    //     .await?;
+    {
+        let db = session.db().await;
+        let room_id = db.get_channel_room(channel_id).await?;
+
+        let room = db
+            .join_room(
+                room_id,
+                session.user_id,
+                Some(channel_id),
+                session.connection_id,
+            )
+            .await?;
+
+        let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| {
+            let token = live_kit
+                .room_token(&room.live_kit_room, &session.user_id.to_string())
+                .trace_err()?;
 
-    let (room, live_kit_connection_info) = create_room(&session).await?;
+            Some(LiveKitConnectionInfo {
+                server_url: live_kit.url().into(),
+                token,
+            })
+        });
 
-    // db.set_channel_room(channel_id, room.id).await?;
+        response.send(proto::JoinRoomResponse {
+            room: Some(room.clone()),
+            live_kit_connection_info,
+        })?;
 
-    response.send(proto::CreateRoomResponse {
-        room: Some(room.clone()),
-        live_kit_connection_info,
-    })?;
+        room_updated(&room, &session.peer);
+    }
 
     update_user_contacts(session.user_id, &session).await?;
 
@@ -2367,7 +2423,7 @@ async fn leave_room_for_session(session: &Session) -> Result<()> {
         room_id = RoomId::from_proto(left_room.room.id);
         canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids);
         live_kit_room = mem::take(&mut left_room.room.live_kit_room);
-        delete_live_kit_room = left_room.room.participants.is_empty();
+        delete_live_kit_room = left_room.deleted;
     } else {
         return Ok(());
     }
@@ -2435,42 +2491,6 @@ fn project_left(project: &db::LeftProject, session: &Session) {
     }
 }
 
-async fn create_room(session: &Session) -> Result<(proto::Room, Option<LiveKitConnectionInfo>)> {
-    let live_kit_room = nanoid::nanoid!(30);
-
-    let live_kit_connection_info = {
-        let live_kit_room = live_kit_room.clone();
-        let live_kit = session.live_kit_client.as_ref();
-
-        util::async_iife!({
-            let live_kit = live_kit?;
-
-            live_kit
-                .create_room(live_kit_room.clone())
-                .await
-                .trace_err()?;
-
-            let token = live_kit
-                .room_token(&live_kit_room, &session.user_id.to_string())
-                .trace_err()?;
-
-            Some(proto::LiveKitConnectionInfo {
-                server_url: live_kit.url().into(),
-                token,
-            })
-        })
-    }
-    .await;
-
-    let room = session
-        .db()
-        .await
-        .create_room(session.user_id, session.connection_id, &live_kit_room)
-        .await?;
-
-    Ok((room, live_kit_connection_info))
-}
-
 pub trait ResultExt {
     type Ok;
 

crates/collab/src/tests/channel_tests.rs 🔗

@@ -108,17 +108,52 @@ async fn test_channel_room(
         .await
         .unwrap();
 
+    active_call_b
+        .update(cx_b, |active_call, cx| active_call.join_channel(zed_id, cx))
+        .await
+        .unwrap();
+
     deterministic.run_until_parked();
 
     let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone());
+    room_a.read_with(cx_a, |room, _| assert!(room.is_connected()));
     assert_eq!(
         room_participants(&room_a, cx_a),
+        RoomParticipants {
+            remote: vec!["user_b".to_string()],
+            pending: vec![]
+        }
+    );
+
+    let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone());
+    room_b.read_with(cx_b, |room, _| assert!(room.is_connected()));
+    assert_eq!(
+        room_participants(&room_b, cx_b),
         RoomParticipants {
             remote: vec!["user_a".to_string()],
             pending: vec![]
         }
     );
 
+    // Make sure that leaving and rejoining works
+
+    active_call_a
+        .update(cx_a, |active_call, cx| active_call.hang_up(cx))
+        .await
+        .unwrap();
+
+    active_call_b
+        .update(cx_b, |active_call, cx| active_call.hang_up(cx))
+        .await
+        .unwrap();
+
+    // Make sure room exists?
+
+    active_call_a
+        .update(cx_a, |active_call, cx| active_call.join_channel(zed_id, cx))
+        .await
+        .unwrap();
+
     active_call_b
         .update(cx_b, |active_call, cx| active_call.join_channel(zed_id, cx))
         .await
@@ -126,42 +161,23 @@ async fn test_channel_room(
 
     deterministic.run_until_parked();
 
-    let active_call_b = cx_b.read(ActiveCall::global);
+    let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone());
+    room_a.read_with(cx_a, |room, _| assert!(room.is_connected()));
+    assert_eq!(
+        room_participants(&room_a, cx_a),
+        RoomParticipants {
+            remote: vec!["user_b".to_string()],
+            pending: vec![]
+        }
+    );
+
     let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone());
+    room_b.read_with(cx_b, |room, _| assert!(room.is_connected()));
     assert_eq!(
         room_participants(&room_b, cx_b),
         RoomParticipants {
-            remote: vec!["user_a".to_string(), "user_b".to_string()],
+            remote: vec!["user_a".to_string()],
             pending: vec![]
         }
     );
 }
-
-// TODO:
-// Invariants to test:
-// 1. Dag structure is maintained for all operations (can't make a cycle)
-// 2. Can't be a member of a super channel, and accept a membership of a sub channel (by definition, a noop)
-
-// #[gpui::test]
-// async fn test_block_cycle_creation(deterministic: Arc<Deterministic>, cx: &mut TestAppContext) {
-//     // deterministic.forbid_parking();
-//     // let mut server = TestServer::start(&deterministic).await;
-//     // let client_a = server.create_client(cx, "user_a").await;
-//     // let a_id = crate::db::UserId(client_a.user_id().unwrap() as i32);
-//     // let db = server._test_db.db();
-
-//     // let zed_id = db.create_root_channel("zed", a_id).await.unwrap();
-//     // let first_id = db.create_channel("first", Some(zed_id)).await.unwrap();
-//     // let second_id = db
-//     //     .create_channel("second_id", Some(first_id))
-//     //     .await
-//     //     .unwrap();
-// }
-
-/*
-Linear things:
-- A way of expressing progress to the team
-- A way for us to agree on a scope
-- A way to figure out what we're supposed to be doing
-
-*/

crates/rpc/src/proto.rs 🔗

@@ -295,7 +295,7 @@ request_messages!(
     (RemoveContact, Ack),
     (RespondToContactRequest, Ack),
     (RespondToChannelInvite, Ack),
-    (JoinChannel, CreateRoomResponse),
+    (JoinChannel, JoinRoomResponse),
     (RenameProjectEntry, ProjectEntryResponse),
     (SaveBuffer, BufferSaved),
     (SearchProject, SearchProjectResponse),