Start moving `Store` state into the database

Antonio Scandurra created

Change summary

crates/call/src/call.rs                                          |  20 
crates/call/src/room.rs                                          |   8 
crates/collab/migrations.sqlite/20221109000000_test_schema.sql   |   2 
crates/collab/migrations/20221111092550_reconnection_support.sql |   2 
crates/collab/src/db.rs                                          | 354 +
crates/collab/src/integration_tests.rs                           |  14 
crates/collab/src/rpc.rs                                         | 115 
crates/collab/src/rpc/store.rs                                   | 248 -
crates/collab_ui/src/incoming_call_notification.rs               |   6 
crates/rpc/proto/zed.proto                                       |  13 
crates/rpc/src/rpc.rs                                            |   2 
11 files changed, 447 insertions(+), 337 deletions(-)

Detailed changes

crates/call/src/call.rs 🔗

@@ -22,7 +22,7 @@ pub fn init(client: Arc<Client>, user_store: ModelHandle<UserStore>, cx: &mut Mu
 #[derive(Clone)]
 pub struct IncomingCall {
     pub room_id: u64,
-    pub caller: Arc<User>,
+    pub calling_user: Arc<User>,
     pub participants: Vec<Arc<User>>,
     pub initial_project: Option<proto::ParticipantProject>,
 }
@@ -78,9 +78,9 @@ impl ActiveCall {
                     user_store.get_users(envelope.payload.participant_user_ids, cx)
                 })
                 .await?,
-            caller: user_store
+            calling_user: user_store
                 .update(&mut cx, |user_store, cx| {
-                    user_store.get_user(envelope.payload.caller_user_id, cx)
+                    user_store.get_user(envelope.payload.calling_user_id, cx)
                 })
                 .await?,
             initial_project: envelope.payload.initial_project,
@@ -110,13 +110,13 @@ impl ActiveCall {
 
     pub fn invite(
         &mut self,
-        recipient_user_id: u64,
+        called_user_id: u64,
         initial_project: Option<ModelHandle<Project>>,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<()>> {
         let client = self.client.clone();
         let user_store = self.user_store.clone();
-        if !self.pending_invites.insert(recipient_user_id) {
+        if !self.pending_invites.insert(called_user_id) {
             return Task::ready(Err(anyhow!("user was already invited")));
         }
 
@@ -136,13 +136,13 @@ impl ActiveCall {
                     };
 
                     room.update(&mut cx, |room, cx| {
-                        room.call(recipient_user_id, initial_project_id, cx)
+                        room.call(called_user_id, initial_project_id, cx)
                     })
                     .await?;
                 } else {
                     let room = cx
                         .update(|cx| {
-                            Room::create(recipient_user_id, initial_project, client, user_store, cx)
+                            Room::create(called_user_id, initial_project, client, user_store, cx)
                         })
                         .await?;
 
@@ -155,7 +155,7 @@ impl ActiveCall {
 
             let result = invite.await;
             this.update(&mut cx, |this, cx| {
-                this.pending_invites.remove(&recipient_user_id);
+                this.pending_invites.remove(&called_user_id);
                 cx.notify();
             });
             result
@@ -164,7 +164,7 @@ impl ActiveCall {
 
     pub fn cancel_invite(
         &mut self,
-        recipient_user_id: u64,
+        called_user_id: u64,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<()>> {
         let room_id = if let Some(room) = self.room() {
@@ -178,7 +178,7 @@ impl ActiveCall {
             client
                 .request(proto::CancelCall {
                     room_id,
-                    recipient_user_id,
+                    called_user_id,
                 })
                 .await?;
             anyhow::Ok(())

crates/call/src/room.rs 🔗

@@ -149,7 +149,7 @@ impl Room {
     }
 
     pub(crate) fn create(
-        recipient_user_id: u64,
+        called_user_id: u64,
         initial_project: Option<ModelHandle<Project>>,
         client: Arc<Client>,
         user_store: ModelHandle<UserStore>,
@@ -182,7 +182,7 @@ impl Room {
             match room
                 .update(&mut cx, |room, cx| {
                     room.leave_when_empty = true;
-                    room.call(recipient_user_id, initial_project_id, cx)
+                    room.call(called_user_id, initial_project_id, cx)
                 })
                 .await
             {
@@ -487,7 +487,7 @@ impl Room {
 
     pub(crate) fn call(
         &mut self,
-        recipient_user_id: u64,
+        called_user_id: u64,
         initial_project_id: Option<u64>,
         cx: &mut ModelContext<Self>,
     ) -> Task<Result<()>> {
@@ -503,7 +503,7 @@ impl Room {
             let result = client
                 .request(proto::Call {
                     room_id,
-                    recipient_user_id,
+                    called_user_id,
                     initial_project_id,
                 })
                 .await;

crates/collab/migrations.sqlite/20221109000000_test_schema.sql 🔗

@@ -82,4 +82,4 @@ CREATE TABLE "calls" (
     "answering_connection_id" INTEGER,
     "initial_project_id" INTEGER REFERENCES projects (id)
 );
-CREATE UNIQUE INDEX "index_calls_on_calling_user_id" ON "calls" ("calling_user_id");
+CREATE UNIQUE INDEX "index_calls_on_called_user_id" ON "calls" ("called_user_id");

crates/collab/migrations/20221111092550_reconnection_support.sql 🔗

@@ -44,4 +44,4 @@ CREATE TABLE IF NOT EXISTS "calls" (
     "answering_connection_id" INTEGER,
     "initial_project_id" INTEGER REFERENCES projects (id)
 );
-CREATE UNIQUE INDEX "index_calls_on_calling_user_id" ON "calls" ("calling_user_id");
+CREATE UNIQUE INDEX "index_calls_on_called_user_id" ON "calls" ("called_user_id");

crates/collab/src/db.rs 🔗

@@ -3,6 +3,7 @@ use anyhow::anyhow;
 use axum::http::StatusCode;
 use collections::HashMap;
 use futures::StreamExt;
+use rpc::{proto, ConnectionId};
 use serde::{Deserialize, Serialize};
 use sqlx::{
     migrate::{Migrate as _, Migration, MigrationSource},
@@ -565,6 +566,7 @@ where
     for<'a> i64: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
     for<'a> bool: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
     for<'a> Uuid: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
+    for<'a> Option<ProjectId>: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
     for<'a> sqlx::types::JsonValue: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
     for<'a> OffsetDateTime: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>,
     for<'a> PrimitiveDateTime: sqlx::Decode<'a, D> + sqlx::Decode<'a, D>,
@@ -882,42 +884,352 @@ where
         })
     }
 
-    // projects
-
-    /// Registers a new project for the given user.
-    pub async fn register_project(&self, host_user_id: UserId) -> Result<ProjectId> {
+    pub async fn create_room(
+        &self,
+        user_id: UserId,
+        connection_id: ConnectionId,
+    ) -> Result<proto::Room> {
         test_support!(self, {
-            Ok(sqlx::query_scalar(
+            let mut tx = self.pool.begin().await?;
+            let live_kit_room = nanoid::nanoid!(30);
+            let room_id = sqlx::query_scalar(
                 "
-                INSERT INTO projects(host_user_id)
-                VALUES ($1)
+                INSERT INTO rooms (live_kit_room, version)
+                VALUES ($1, $2)
                 RETURNING id
                 ",
             )
-            .bind(host_user_id)
-            .fetch_one(&self.pool)
+            .bind(&live_kit_room)
+            .bind(0)
+            .fetch_one(&mut tx)
             .await
-            .map(ProjectId)?)
+            .map(RoomId)?;
+
+            sqlx::query(
+                "
+                INSERT INTO room_participants (room_id, user_id, connection_id)
+                VALUES ($1, $2, $3)
+                ",
+            )
+            .bind(room_id)
+            .bind(user_id)
+            .bind(connection_id.0 as i32)
+            .execute(&mut tx)
+            .await?;
+
+            sqlx::query(
+                "
+                INSERT INTO calls (room_id, calling_user_id, called_user_id, answering_connection_id)
+                VALUES ($1, $2, $3, $4)
+                ",
+            )
+            .bind(room_id)
+            .bind(user_id)
+            .bind(user_id)
+            .bind(connection_id.0 as i32)
+            .execute(&mut tx)
+            .await?;
+
+            self.commit_room_transaction(room_id, tx).await
         })
     }
 
-    /// Unregisters a project for the given project id.
-    pub async fn unregister_project(&self, project_id: ProjectId) -> Result<()> {
+    pub async fn call(
+        &self,
+        room_id: RoomId,
+        calling_user_id: UserId,
+        called_user_id: UserId,
+        initial_project_id: Option<ProjectId>,
+    ) -> Result<proto::Room> {
         test_support!(self, {
+            let mut tx = self.pool.begin().await?;
             sqlx::query(
                 "
-                UPDATE projects
-                SET unregistered = TRUE
-                WHERE id = $1
+                INSERT INTO calls (room_id, calling_user_id, called_user_id, initial_project_id)
+                VALUES ($1, $2, $3, $4)
+                ",
+            )
+            .bind(room_id)
+            .bind(calling_user_id)
+            .bind(called_user_id)
+            .bind(initial_project_id)
+            .execute(&mut tx)
+            .await?;
+
+            sqlx::query(
+                "
+                INSERT INTO room_participants (room_id, user_id)
+                VALUES ($1, $2)
+                ",
+            )
+            .bind(room_id)
+            .bind(called_user_id)
+            .execute(&mut tx)
+            .await?;
+
+            self.commit_room_transaction(room_id, tx).await
+        })
+    }
+
+    pub async fn call_failed(
+        &self,
+        room_id: RoomId,
+        called_user_id: UserId,
+    ) -> Result<proto::Room> {
+        test_support!(self, {
+            let mut tx = self.pool.begin().await?;
+            sqlx::query(
+                "
+                DELETE FROM calls
+                WHERE room_id = $1 AND called_user_id = $2
+                ",
+            )
+            .bind(room_id)
+            .bind(called_user_id)
+            .execute(&mut tx)
+            .await?;
+
+            sqlx::query(
+                "
+                DELETE FROM room_participants
+                WHERE room_id = $1 AND user_id = $2
+                ",
+            )
+            .bind(room_id)
+            .bind(called_user_id)
+            .execute(&mut tx)
+            .await?;
+
+            self.commit_room_transaction(room_id, tx).await
+        })
+    }
+
+    pub async fn update_room_participant_location(
+        &self,
+        room_id: RoomId,
+        user_id: UserId,
+        location: proto::ParticipantLocation,
+    ) -> Result<proto::Room> {
+        test_support!(self, {
+            let mut tx = self.pool.begin().await?;
+
+            let location_kind;
+            let location_project_id;
+            match location
+                .variant
+                .ok_or_else(|| anyhow!("invalid location"))?
+            {
+                proto::participant_location::Variant::SharedProject(project) => {
+                    location_kind = 0;
+                    location_project_id = Some(ProjectId::from_proto(project.id));
+                }
+                proto::participant_location::Variant::UnsharedProject(_) => {
+                    location_kind = 1;
+                    location_project_id = None;
+                }
+                proto::participant_location::Variant::External(_) => {
+                    location_kind = 2;
+                    location_project_id = None;
+                }
+            }
+
+            sqlx::query(
+                "
+                UPDATE room_participants
+                SET location_kind = $1 AND location_project_id = $2
+                WHERE room_id = $1 AND user_id = $2
+                ",
+            )
+            .bind(location_kind)
+            .bind(location_project_id)
+            .bind(room_id)
+            .bind(user_id)
+            .execute(&mut tx)
+            .await?;
+
+            self.commit_room_transaction(room_id, tx).await
+        })
+    }
+
+    async fn commit_room_transaction(
+        &self,
+        room_id: RoomId,
+        mut tx: sqlx::Transaction<'_, D>,
+    ) -> Result<proto::Room> {
+        sqlx::query(
+            "
+            UPDATE rooms
+            SET version = version + 1
+            WHERE id = $1
+            ",
+        )
+        .bind(room_id)
+        .execute(&mut tx)
+        .await?;
+
+        let room: Room = sqlx::query_as(
+            "
+            SELECT *
+            FROM rooms
+            WHERE id = $1
+            ",
+        )
+        .bind(room_id)
+        .fetch_one(&mut tx)
+        .await?;
+
+        let mut db_participants =
+            sqlx::query_as::<_, (UserId, Option<i32>, Option<i32>, Option<i32>)>(
+                "
+                SELECT user_id, connection_id, location_kind, location_project_id
+                FROM room_participants
+                WHERE room_id = $1
+                ",
+            )
+            .bind(room_id)
+            .fetch(&mut tx);
+
+        let mut participants = Vec::new();
+        let mut pending_participant_user_ids = Vec::new();
+        while let Some(participant) = db_participants.next().await {
+            let (user_id, connection_id, _location_kind, _location_project_id) = participant?;
+            if let Some(connection_id) = connection_id {
+                participants.push(proto::Participant {
+                    user_id: user_id.to_proto(),
+                    peer_id: connection_id as u32,
+                    projects: Default::default(),
+                    location: Some(proto::ParticipantLocation {
+                        variant: Some(proto::participant_location::Variant::External(
+                            Default::default(),
+                        )),
+                    }),
+                });
+            } else {
+                pending_participant_user_ids.push(user_id.to_proto());
+            }
+        }
+        drop(db_participants);
+
+        for participant in &mut participants {
+            let mut entries = sqlx::query_as::<_, (ProjectId, String)>(
+                "
+                SELECT projects.id, worktrees.root_name
+                FROM projects
+                LEFT JOIN worktrees ON projects.id = worktrees.project_id
+                WHERE room_id = $1 AND host_user_id = $2
+                ",
+            )
+            .bind(room_id)
+            .fetch(&mut tx);
+
+            let mut projects = HashMap::default();
+            while let Some(entry) = entries.next().await {
+                let (project_id, worktree_root_name) = entry?;
+                let participant_project =
+                    projects
+                        .entry(project_id)
+                        .or_insert(proto::ParticipantProject {
+                            id: project_id.to_proto(),
+                            worktree_root_names: Default::default(),
+                        });
+                participant_project
+                    .worktree_root_names
+                    .push(worktree_root_name);
+            }
+
+            participant.projects = projects.into_values().collect();
+        }
+
+        tx.commit().await?;
+
+        Ok(proto::Room {
+            id: room.id.to_proto(),
+            version: room.version as u64,
+            live_kit_room: room.live_kit_room,
+            participants,
+            pending_participant_user_ids,
+        })
+    }
+
+    // projects
+
+    pub async fn share_project(
+        &self,
+        user_id: UserId,
+        connection_id: ConnectionId,
+        room_id: RoomId,
+        worktrees: &[proto::WorktreeMetadata],
+    ) -> Result<(ProjectId, proto::Room)> {
+        test_support!(self, {
+            let mut tx = self.pool.begin().await?;
+            let project_id = sqlx::query_scalar(
+                "
+                INSERT INTO projects (host_user_id, room_id)
+                VALUES ($1)
+                RETURNING id
+            ",
+            )
+            .bind(user_id)
+            .bind(room_id)
+            .fetch_one(&mut tx)
+            .await
+            .map(ProjectId)?;
+
+            for worktree in worktrees {
+                sqlx::query(
+                    "
+                INSERT INTO worktrees (id, project_id, root_name)
+                ",
+                )
+                .bind(worktree.id as i32)
+                .bind(project_id)
+                .bind(&worktree.root_name)
+                .execute(&mut tx)
+                .await?;
+            }
+
+            sqlx::query(
+                "
+                INSERT INTO project_collaborators (
+                project_id,
+                connection_id,
+                user_id,
+                replica_id,
+                is_host
+                )
+                VALUES ($1, $2, $3, $4, $5)
                 ",
             )
             .bind(project_id)
-            .execute(&self.pool)
+            .bind(connection_id.0 as i32)
+            .bind(user_id)
+            .bind(0)
+            .bind(true)
+            .execute(&mut tx)
             .await?;
-            Ok(())
+
+            let room = self.commit_room_transaction(room_id, tx).await?;
+            Ok((project_id, room))
         })
     }
 
+    pub async fn unshare_project(&self, project_id: ProjectId) -> Result<()> {
+        todo!()
+        // test_support!(self, {
+        //     sqlx::query(
+        //         "
+        //         UPDATE projects
+        //         SET unregistered = TRUE
+        //         WHERE id = $1
+        //         ",
+        //     )
+        //     .bind(project_id)
+        //     .execute(&self.pool)
+        //     .await?;
+        //     Ok(())
+        // })
+    }
+
     // contacts
 
     pub async fn get_contacts(&self, user_id: UserId) -> Result<Vec<Contact>> {
@@ -1246,6 +1558,14 @@ pub struct User {
     pub connected_once: bool,
 }
 
+id_type!(RoomId);
+#[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)]
+pub struct Room {
+    pub id: RoomId,
+    pub version: i32,
+    pub live_kit_room: String,
+}
+
 id_type!(ProjectId);
 #[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)]
 pub struct Project {

crates/collab/src/integration_tests.rs 🔗

@@ -104,7 +104,7 @@ async fn test_basic_calls(
     // User B receives the call.
     let mut incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming());
     let call_b = incoming_call_b.next().await.unwrap().unwrap();
-    assert_eq!(call_b.caller.github_login, "user_a");
+    assert_eq!(call_b.calling_user.github_login, "user_a");
 
     // User B connects via another client and also receives a ring on the newly-connected client.
     let _client_b2 = server.create_client(cx_b2, "user_b").await;
@@ -112,7 +112,7 @@ async fn test_basic_calls(
     let mut incoming_call_b2 = active_call_b2.read_with(cx_b2, |call, _| call.incoming());
     deterministic.run_until_parked();
     let call_b2 = incoming_call_b2.next().await.unwrap().unwrap();
-    assert_eq!(call_b2.caller.github_login, "user_a");
+    assert_eq!(call_b2.calling_user.github_login, "user_a");
 
     // User B joins the room using the first client.
     active_call_b
@@ -165,7 +165,7 @@ async fn test_basic_calls(
 
     // User C receives the call, but declines it.
     let call_c = incoming_call_c.next().await.unwrap().unwrap();
-    assert_eq!(call_c.caller.github_login, "user_b");
+    assert_eq!(call_c.calling_user.github_login, "user_b");
     active_call_c.update(cx_c, |call, _| call.decline_incoming().unwrap());
     assert!(incoming_call_c.next().await.unwrap().is_none());
 
@@ -308,7 +308,7 @@ async fn test_room_uniqueness(
     // User B receives the call from user A.
     let mut incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming());
     let call_b1 = incoming_call_b.next().await.unwrap().unwrap();
-    assert_eq!(call_b1.caller.github_login, "user_a");
+    assert_eq!(call_b1.calling_user.github_login, "user_a");
 
     // Ensure calling users A and B from client C fails.
     active_call_c
@@ -367,7 +367,7 @@ async fn test_room_uniqueness(
         .unwrap();
     deterministic.run_until_parked();
     let call_b2 = incoming_call_b.next().await.unwrap().unwrap();
-    assert_eq!(call_b2.caller.github_login, "user_c");
+    assert_eq!(call_b2.calling_user.github_login, "user_c");
 }
 
 #[gpui::test(iterations = 10)]
@@ -695,7 +695,7 @@ async fn test_share_project(
     let incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming());
     deterministic.run_until_parked();
     let call = incoming_call_b.borrow().clone().unwrap();
-    assert_eq!(call.caller.github_login, "user_a");
+    assert_eq!(call.calling_user.github_login, "user_a");
     let initial_project = call.initial_project.unwrap();
     active_call_b
         .update(cx_b, |call, cx| call.accept_incoming(cx))
@@ -766,7 +766,7 @@ async fn test_share_project(
     let incoming_call_c = active_call_c.read_with(cx_c, |call, _| call.incoming());
     deterministic.run_until_parked();
     let call = incoming_call_c.borrow().clone().unwrap();
-    assert_eq!(call.caller.github_login, "user_b");
+    assert_eq!(call.calling_user.github_login, "user_b");
     let initial_project = call.initial_project.unwrap();
     active_call_c
         .update(cx_c, |call, cx| call.accept_incoming(cx))

crates/collab/src/rpc.rs 🔗

@@ -2,7 +2,7 @@ mod store;
 
 use crate::{
     auth,
-    db::{self, ProjectId, User, UserId},
+    db::{self, ProjectId, RoomId, User, UserId},
     AppState, Result,
 };
 use anyhow::anyhow;
@@ -486,7 +486,7 @@ impl Server {
         for project_id in projects_to_unshare {
             self.app_state
                 .db
-                .unregister_project(project_id)
+                .unshare_project(project_id)
                 .await
                 .trace_err();
         }
@@ -559,11 +559,11 @@ impl Server {
         request: Message<proto::CreateRoom>,
         response: Response<proto::CreateRoom>,
     ) -> Result<()> {
-        let room;
-        {
-            let mut store = self.store().await;
-            room = store.create_room(request.sender_connection_id)?.clone();
-        }
+        let room = self
+            .app_state
+            .db
+            .create_room(request.sender_user_id, request.sender_connection_id)
+            .await?;
 
         let live_kit_connection_info =
             if let Some(live_kit) = self.app_state.live_kit_client.as_ref() {
@@ -710,8 +710,9 @@ impl Server {
         request: Message<proto::Call>,
         response: Response<proto::Call>,
     ) -> Result<()> {
-        let caller_user_id = request.sender_user_id;
-        let recipient_user_id = UserId::from_proto(request.payload.recipient_user_id);
+        let room_id = RoomId::from_proto(request.payload.room_id);
+        let calling_user_id = request.sender_user_id;
+        let called_user_id = UserId::from_proto(request.payload.called_user_id);
         let initial_project_id = request
             .payload
             .initial_project_id
@@ -719,31 +720,44 @@ impl Server {
         if !self
             .app_state
             .db
-            .has_contact(caller_user_id, recipient_user_id)
+            .has_contact(calling_user_id, called_user_id)
             .await?
         {
             return Err(anyhow!("cannot call a user who isn't a contact"))?;
         }
 
-        let room_id = request.payload.room_id;
-        let mut calls = {
-            let mut store = self.store().await;
-            let (room, recipient_connection_ids, incoming_call) = store.call(
-                room_id,
-                recipient_user_id,
-                initial_project_id,
-                request.sender_connection_id,
-            )?;
-            self.room_updated(room);
-            recipient_connection_ids
-                .into_iter()
-                .map(|recipient_connection_id| {
-                    self.peer
-                        .request(recipient_connection_id, incoming_call.clone())
-                })
-                .collect::<FuturesUnordered<_>>()
+        let room = self
+            .app_state
+            .db
+            .call(room_id, calling_user_id, called_user_id, initial_project_id)
+            .await?;
+        self.room_updated(&room);
+        self.update_user_contacts(called_user_id).await?;
+
+        let incoming_call = proto::IncomingCall {
+            room_id: room_id.to_proto(),
+            calling_user_id: calling_user_id.to_proto(),
+            participant_user_ids: room
+                .participants
+                .iter()
+                .map(|participant| participant.user_id)
+                .collect(),
+            initial_project: room.participants.iter().find_map(|participant| {
+                let initial_project_id = initial_project_id?.to_proto();
+                participant
+                    .projects
+                    .iter()
+                    .find(|project| project.id == initial_project_id)
+                    .cloned()
+            }),
         };
-        self.update_user_contacts(recipient_user_id).await?;
+
+        let mut calls = self
+            .store()
+            .await
+            .connection_ids_for_user(called_user_id)
+            .map(|connection_id| self.peer.request(connection_id, incoming_call.clone()))
+            .collect::<FuturesUnordered<_>>();
 
         while let Some(call_response) = calls.next().await {
             match call_response.as_ref() {
@@ -757,12 +771,13 @@ impl Server {
             }
         }
 
-        {
-            let mut store = self.store().await;
-            let room = store.call_failed(room_id, recipient_user_id)?;
-            self.room_updated(&room);
-        }
-        self.update_user_contacts(recipient_user_id).await?;
+        let room = self
+            .app_state
+            .db
+            .call_failed(room_id, called_user_id)
+            .await?;
+        self.room_updated(&room);
+        self.update_user_contacts(called_user_id).await?;
 
         Err(anyhow!("failed to ring call recipient"))?
     }
@@ -772,7 +787,7 @@ impl Server {
         request: Message<proto::CancelCall>,
         response: Response<proto::CancelCall>,
     ) -> Result<()> {
-        let recipient_user_id = UserId::from_proto(request.payload.recipient_user_id);
+        let recipient_user_id = UserId::from_proto(request.payload.called_user_id);
         {
             let mut store = self.store().await;
             let (room, recipient_connection_ids) = store.cancel_call(
@@ -814,15 +829,17 @@ impl Server {
         request: Message<proto::UpdateParticipantLocation>,
         response: Response<proto::UpdateParticipantLocation>,
     ) -> Result<()> {
-        let room_id = request.payload.room_id;
+        let room_id = RoomId::from_proto(request.payload.room_id);
         let location = request
             .payload
             .location
             .ok_or_else(|| anyhow!("invalid location"))?;
-        let mut store = self.store().await;
-        let room =
-            store.update_participant_location(room_id, location, request.sender_connection_id)?;
-        self.room_updated(room);
+        let room = self
+            .app_state
+            .db
+            .update_room_participant_location(room_id, request.sender_user_id, location)
+            .await?;
+        self.room_updated(&room);
         response.send(proto::Ack {})?;
         Ok(())
     }
@@ -868,22 +885,20 @@ impl Server {
         request: Message<proto::ShareProject>,
         response: Response<proto::ShareProject>,
     ) -> Result<()> {
-        let project_id = self
+        let (project_id, room) = self
             .app_state
             .db
-            .register_project(request.sender_user_id)
+            .share_project(
+                request.sender_user_id,
+                request.sender_connection_id,
+                RoomId::from_proto(request.payload.room_id),
+                &request.payload.worktrees,
+            )
             .await?;
-        let mut store = self.store().await;
-        let room = store.share_project(
-            request.payload.room_id,
-            project_id,
-            request.payload.worktrees,
-            request.sender_connection_id,
-        )?;
         response.send(proto::ShareProjectResponse {
             project_id: project_id.to_proto(),
         })?;
-        self.room_updated(room);
+        self.room_updated(&room);
 
         Ok(())
     }

crates/collab/src/rpc/store.rs 🔗

@@ -1,12 +1,10 @@
 use crate::db::{self, ProjectId, UserId};
 use anyhow::{anyhow, Result};
 use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
-use nanoid::nanoid;
 use rpc::{proto, ConnectionId};
 use serde::Serialize;
 use std::{borrow::Cow, mem, path::PathBuf, str};
 use tracing::instrument;
-use util::post_inc;
 
 pub type RoomId = u64;
 
@@ -34,7 +32,7 @@ struct ConnectionState {
 
 #[derive(Copy, Clone, Eq, PartialEq, Serialize)]
 pub struct Call {
-    pub caller_user_id: UserId,
+    pub calling_user_id: UserId,
     pub room_id: RoomId,
     pub connection_id: Option<ConnectionId>,
     pub initial_project_id: Option<ProjectId>,
@@ -147,7 +145,7 @@ impl Store {
                 let room = self.room(active_call.room_id)?;
                 Some(proto::IncomingCall {
                     room_id: active_call.room_id,
-                    caller_user_id: active_call.caller_user_id.to_proto(),
+                    calling_user_id: active_call.calling_user_id.to_proto(),
                     participant_user_ids: room
                         .participants
                         .iter()
@@ -285,47 +283,6 @@ impl Store {
         }
     }
 
-    pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<&proto::Room> {
-        let connection = self
-            .connections
-            .get_mut(&creator_connection_id)
-            .ok_or_else(|| anyhow!("no such connection"))?;
-        let connected_user = self
-            .connected_users
-            .get_mut(&connection.user_id)
-            .ok_or_else(|| anyhow!("no such connection"))?;
-        anyhow::ensure!(
-            connected_user.active_call.is_none(),
-            "can't create a room with an active call"
-        );
-
-        let room_id = post_inc(&mut self.next_room_id);
-        let room = proto::Room {
-            id: room_id,
-            participants: vec![proto::Participant {
-                user_id: connection.user_id.to_proto(),
-                peer_id: creator_connection_id.0,
-                projects: Default::default(),
-                location: Some(proto::ParticipantLocation {
-                    variant: Some(proto::participant_location::Variant::External(
-                        proto::participant_location::External {},
-                    )),
-                }),
-            }],
-            pending_participant_user_ids: Default::default(),
-            live_kit_room: nanoid!(30),
-        };
-
-        self.rooms.insert(room_id, room);
-        connected_user.active_call = Some(Call {
-            caller_user_id: connection.user_id,
-            room_id,
-            connection_id: Some(creator_connection_id),
-            initial_project_id: None,
-        });
-        Ok(self.rooms.get(&room_id).unwrap())
-    }
-
     pub fn join_room(
         &mut self,
         room_id: RoomId,
@@ -424,7 +381,7 @@ impl Store {
                     .get_mut(&UserId::from_proto(*pending_participant_user_id))
                 {
                     if let Some(call) = connected_user.active_call.as_ref() {
-                        if call.caller_user_id == user_id {
+                        if call.calling_user_id == user_id {
                             connected_user.active_call.take();
                             canceled_call_connection_ids
                                 .extend(connected_user.connection_ids.iter().copied());
@@ -462,101 +419,10 @@ impl Store {
         &self.rooms
     }
 
-    pub fn call(
-        &mut self,
-        room_id: RoomId,
-        recipient_user_id: UserId,
-        initial_project_id: Option<ProjectId>,
-        from_connection_id: ConnectionId,
-    ) -> Result<(&proto::Room, Vec<ConnectionId>, proto::IncomingCall)> {
-        let caller_user_id = self.user_id_for_connection(from_connection_id)?;
-
-        let recipient_connection_ids = self
-            .connection_ids_for_user(recipient_user_id)
-            .collect::<Vec<_>>();
-        let mut recipient = self
-            .connected_users
-            .get_mut(&recipient_user_id)
-            .ok_or_else(|| anyhow!("no such connection"))?;
-        anyhow::ensure!(
-            recipient.active_call.is_none(),
-            "recipient is already on another call"
-        );
-
-        let room = self
-            .rooms
-            .get_mut(&room_id)
-            .ok_or_else(|| anyhow!("no such room"))?;
-        anyhow::ensure!(
-            room.participants
-                .iter()
-                .any(|participant| participant.peer_id == from_connection_id.0),
-            "no such room"
-        );
-        anyhow::ensure!(
-            room.pending_participant_user_ids
-                .iter()
-                .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id),
-            "cannot call the same user more than once"
-        );
-        room.pending_participant_user_ids
-            .push(recipient_user_id.to_proto());
-
-        if let Some(initial_project_id) = initial_project_id {
-            let project = self
-                .projects
-                .get(&initial_project_id)
-                .ok_or_else(|| anyhow!("no such project"))?;
-            anyhow::ensure!(project.room_id == room_id, "no such project");
-        }
-
-        recipient.active_call = Some(Call {
-            caller_user_id,
-            room_id,
-            connection_id: None,
-            initial_project_id,
-        });
-
-        Ok((
-            room,
-            recipient_connection_ids,
-            proto::IncomingCall {
-                room_id,
-                caller_user_id: caller_user_id.to_proto(),
-                participant_user_ids: room
-                    .participants
-                    .iter()
-                    .map(|participant| participant.user_id)
-                    .collect(),
-                initial_project: initial_project_id
-                    .and_then(|id| Self::build_participant_project(id, &self.projects)),
-            },
-        ))
-    }
-
-    pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> {
-        let mut recipient = self
-            .connected_users
-            .get_mut(&to_user_id)
-            .ok_or_else(|| anyhow!("no such connection"))?;
-        anyhow::ensure!(recipient
-            .active_call
-            .map_or(false, |call| call.room_id == room_id
-                && call.connection_id.is_none()));
-        recipient.active_call = None;
-        let room = self
-            .rooms
-            .get_mut(&room_id)
-            .ok_or_else(|| anyhow!("no such room"))?;
-        room.pending_participant_user_ids
-            .retain(|user_id| UserId::from_proto(*user_id) != to_user_id);
-        Ok(room)
-    }
-
     pub fn cancel_call(
         &mut self,
         room_id: RoomId,
-        recipient_user_id: UserId,
+        called_user_id: UserId,
         canceller_connection_id: ConnectionId,
     ) -> Result<(&proto::Room, HashSet<ConnectionId>)> {
         let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?;
@@ -566,7 +432,7 @@ impl Store {
             .ok_or_else(|| anyhow!("no such connection"))?;
         let recipient = self
             .connected_users
-            .get(&recipient_user_id)
+            .get(&called_user_id)
             .ok_or_else(|| anyhow!("no such connection"))?;
         let canceller_active_call = canceller
             .active_call
@@ -595,9 +461,9 @@ impl Store {
             .get_mut(&room_id)
             .ok_or_else(|| anyhow!("no such room"))?;
         room.pending_participant_user_ids
-            .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
+            .retain(|user_id| UserId::from_proto(*user_id) != called_user_id);
 
-        let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap();
+        let recipient = self.connected_users.get_mut(&called_user_id).unwrap();
         recipient.active_call.take();
 
         Ok((room, recipient.connection_ids.clone()))
@@ -608,10 +474,10 @@ impl Store {
         room_id: RoomId,
         recipient_connection_id: ConnectionId,
     ) -> Result<(&proto::Room, Vec<ConnectionId>)> {
-        let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?;
+        let called_user_id = self.user_id_for_connection(recipient_connection_id)?;
         let recipient = self
             .connected_users
-            .get_mut(&recipient_user_id)
+            .get_mut(&called_user_id)
             .ok_or_else(|| anyhow!("no such connection"))?;
         if let Some(active_call) = recipient.active_call {
             anyhow::ensure!(active_call.room_id == room_id, "no such room");
@@ -621,112 +487,20 @@ impl Store {
             );
             recipient.active_call.take();
             let recipient_connection_ids = self
-                .connection_ids_for_user(recipient_user_id)
+                .connection_ids_for_user(called_user_id)
                 .collect::<Vec<_>>();
             let room = self
                 .rooms
                 .get_mut(&active_call.room_id)
                 .ok_or_else(|| anyhow!("no such room"))?;
             room.pending_participant_user_ids
-                .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id);
+                .retain(|user_id| UserId::from_proto(*user_id) != called_user_id);
             Ok((room, recipient_connection_ids))
         } else {
             Err(anyhow!("user is not being called"))
         }
     }
 
-    pub fn update_participant_location(
-        &mut self,
-        room_id: RoomId,
-        location: proto::ParticipantLocation,
-        connection_id: ConnectionId,
-    ) -> Result<&proto::Room> {
-        let room = self
-            .rooms
-            .get_mut(&room_id)
-            .ok_or_else(|| anyhow!("no such room"))?;
-        if let Some(proto::participant_location::Variant::SharedProject(project)) =
-            location.variant.as_ref()
-        {
-            anyhow::ensure!(
-                room.participants
-                    .iter()
-                    .flat_map(|participant| &participant.projects)
-                    .any(|participant_project| participant_project.id == project.id),
-                "no such project"
-            );
-        }
-
-        let participant = room
-            .participants
-            .iter_mut()
-            .find(|participant| participant.peer_id == connection_id.0)
-            .ok_or_else(|| anyhow!("no such room"))?;
-        participant.location = Some(location);
-
-        Ok(room)
-    }
-
-    pub fn share_project(
-        &mut self,
-        room_id: RoomId,
-        project_id: ProjectId,
-        worktrees: Vec<proto::WorktreeMetadata>,
-        host_connection_id: ConnectionId,
-    ) -> Result<&proto::Room> {
-        let connection = self
-            .connections
-            .get_mut(&host_connection_id)
-            .ok_or_else(|| anyhow!("no such connection"))?;
-
-        let room = self
-            .rooms
-            .get_mut(&room_id)
-            .ok_or_else(|| anyhow!("no such room"))?;
-        let participant = room
-            .participants
-            .iter_mut()
-            .find(|participant| participant.peer_id == host_connection_id.0)
-            .ok_or_else(|| anyhow!("no such room"))?;
-
-        connection.projects.insert(project_id);
-        self.projects.insert(
-            project_id,
-            Project {
-                id: project_id,
-                room_id,
-                host_connection_id,
-                host: Collaborator {
-                    user_id: connection.user_id,
-                    replica_id: 0,
-                    admin: connection.admin,
-                },
-                guests: Default::default(),
-                active_replica_ids: Default::default(),
-                worktrees: worktrees
-                    .into_iter()
-                    .map(|worktree| {
-                        (
-                            worktree.id,
-                            Worktree {
-                                root_name: worktree.root_name,
-                                visible: worktree.visible,
-                                ..Default::default()
-                            },
-                        )
-                    })
-                    .collect(),
-                language_servers: Default::default(),
-            },
-        );
-
-        participant
-            .projects
-            .extend(Self::build_participant_project(project_id, &self.projects));
-
-        Ok(room)
-    }
-
     pub fn unshare_project(
         &mut self,
         project_id: ProjectId,

crates/collab_ui/src/incoming_call_notification.rs 🔗

@@ -74,7 +74,7 @@ impl IncomingCallNotification {
         let active_call = ActiveCall::global(cx);
         if action.accept {
             let join = active_call.update(cx, |active_call, cx| active_call.accept_incoming(cx));
-            let caller_user_id = self.call.caller.id;
+            let caller_user_id = self.call.calling_user.id;
             let initial_project_id = self.call.initial_project.as_ref().map(|project| project.id);
             cx.spawn_weak(|_, mut cx| async move {
                 join.await?;
@@ -105,7 +105,7 @@ impl IncomingCallNotification {
             .as_ref()
             .unwrap_or(&default_project);
         Flex::row()
-            .with_children(self.call.caller.avatar.clone().map(|avatar| {
+            .with_children(self.call.calling_user.avatar.clone().map(|avatar| {
                 Image::new(avatar)
                     .with_style(theme.caller_avatar)
                     .aligned()
@@ -115,7 +115,7 @@ impl IncomingCallNotification {
                 Flex::column()
                     .with_child(
                         Label::new(
-                            self.call.caller.github_login.clone(),
+                            self.call.calling_user.github_login.clone(),
                             theme.caller_username.text.clone(),
                         )
                         .contained()

crates/rpc/proto/zed.proto 🔗

@@ -164,9 +164,10 @@ message LeaveRoom {
 
 message Room {
     uint64 id = 1;
-    repeated Participant participants = 2;
-    repeated uint64 pending_participant_user_ids = 3;
-    string live_kit_room = 4;
+    uint64 version = 2;
+    repeated Participant participants = 3;
+    repeated uint64 pending_participant_user_ids = 4;
+    string live_kit_room = 5;
 }
 
 message Participant {
@@ -199,13 +200,13 @@ message ParticipantLocation {
 
 message Call {
     uint64 room_id = 1;
-    uint64 recipient_user_id = 2;
+    uint64 called_user_id = 2;
     optional uint64 initial_project_id = 3;
 }
 
 message IncomingCall {
     uint64 room_id = 1;
-    uint64 caller_user_id = 2;
+    uint64 calling_user_id = 2;
     repeated uint64 participant_user_ids = 3;
     optional ParticipantProject initial_project = 4;
 }
@@ -214,7 +215,7 @@ message CallCanceled {}
 
 message CancelCall {
     uint64 room_id = 1;
-    uint64 recipient_user_id = 2;
+    uint64 called_user_id = 2;
 }
 
 message DeclineCall {

crates/rpc/src/rpc.rs 🔗

@@ -6,4 +6,4 @@ pub use conn::Connection;
 pub use peer::*;
 mod macros;
 
-pub const PROTOCOL_VERSION: u32 = 39;
+pub const PROTOCOL_VERSION: u32 = 40;