Go back to a compiling state, panicking on unimplemented db methods

Antonio Scandurra created

Change summary

crates/collab/src/db.rs                      | 1228 +++++++++++++++++++++
crates/collab/src/db/project.rs              |   12 
crates/collab/src/db/project_collaborator.rs |    4 
crates/collab/src/db/user.rs                 |    8 
4 files changed, 1,205 insertions(+), 47 deletions(-)

Detailed changes

crates/collab/src/db.rs 🔗

@@ -12,7 +12,7 @@ mod worktree;
 
 use crate::{Error, Result};
 use anyhow::anyhow;
-use collections::HashMap;
+use collections::{BTreeMap, HashMap, HashSet};
 pub use contact::Contact;
 use dashmap::DashMap;
 use futures::StreamExt;
@@ -255,6 +255,19 @@ impl Database {
         .await
     }
 
+    pub async fn set_user_connected_once(&self, id: UserId, connected_once: bool) -> Result<()> {
+        self.transact(|tx| async move {
+            user::Entity::update_many()
+                .filter(user::Column::Id.eq(id))
+                .col_expr(user::Column::ConnectedOnce, connected_once.into())
+                .exec(&tx)
+                .await?;
+            tx.commit().await?;
+            Ok(())
+        })
+        .await
+    }
+
     pub async fn destroy_user(&self, id: UserId) -> Result<()> {
         self.transact(|tx| async move {
             access_token::Entity::delete_many()
@@ -360,6 +373,17 @@ impl Database {
         .await
     }
 
+    pub async fn is_user_busy(&self, user_id: UserId) -> Result<bool> {
+        self.transact(|tx| async move {
+            let participant = room_participant::Entity::find()
+                .filter(room_participant::Column::UserId.eq(user_id))
+                .one(&tx)
+                .await?;
+            Ok(participant.is_some())
+        })
+        .await
+    }
+
     pub async fn has_contact(&self, user_id_1: UserId, user_id_2: UserId) -> Result<bool> {
         self.transact(|tx| async move {
             let (id_a, id_b) = if user_id_1 < user_id_2 {
@@ -896,63 +920,447 @@ impl Database {
         .await
     }
 
-    // projects
+    // rooms
 
-    pub async fn share_project(
+    pub async fn incoming_call_for_user(
         &self,
-        room_id: RoomId,
-        connection_id: ConnectionId,
-        worktrees: &[proto::WorktreeMetadata],
-    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
+        user_id: UserId,
+    ) -> Result<Option<proto::IncomingCall>> {
         self.transact(|tx| async move {
-            let participant = room_participant::Entity::find()
-                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
+            let pending_participant = room_participant::Entity::find()
+                .filter(
+                    room_participant::Column::UserId
+                        .eq(user_id)
+                        .and(room_participant::Column::AnsweringConnectionId.is_null()),
+                )
                 .one(&tx)
-                .await?
-                .ok_or_else(|| anyhow!("could not find participant"))?;
-            if participant.room_id != room_id {
-                return Err(anyhow!("shared project on unexpected room"))?;
-            }
+                .await?;
 
-            let project = project::ActiveModel {
-                room_id: ActiveValue::set(participant.room_id),
-                host_user_id: ActiveValue::set(participant.user_id),
-                host_connection_id: ActiveValue::set(connection_id.0 as i32),
-                ..Default::default()
+            if let Some(pending_participant) = pending_participant {
+                let room = self.get_room(pending_participant.room_id, &tx).await?;
+                Ok(Self::build_incoming_call(&room, user_id))
+            } else {
+                Ok(None)
             }
-            .insert(&tx)
-            .await?;
+        })
+        .await
+    }
 
-            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
-                id: ActiveValue::set(worktree.id as i32),
-                project_id: ActiveValue::set(project.id),
-                abs_path: ActiveValue::set(worktree.abs_path.clone()),
-                root_name: ActiveValue::set(worktree.root_name.clone()),
-                visible: ActiveValue::set(worktree.visible),
-                scan_id: ActiveValue::set(0),
-                is_complete: ActiveValue::set(false),
-            }))
-            .exec(&tx)
-            .await?;
+    pub async fn create_room(
+        &self,
+        user_id: UserId,
+        connection_id: ConnectionId,
+        live_kit_room: &str,
+    ) -> Result<RoomGuard<proto::Room>> {
+        self.transact(|tx| async move {
+            todo!()
+            // let room_id = sqlx::query_scalar(
+            //     "
+            //     INSERT INTO rooms (live_kit_room)
+            //     VALUES ($1)
+            //     RETURNING id
+            //     ",
+            // )
+            // .bind(&live_kit_room)
+            // .fetch_one(&mut tx)
+            // .await
+            // .map(RoomId)?;
+
+            // sqlx::query(
+            //     "
+            //     INSERT INTO room_participants (room_id, user_id, answering_connection_id, calling_user_id, calling_connection_id)
+            //     VALUES ($1, $2, $3, $4, $5)
+            //     ",
+            // )
+            // .bind(room_id)
+            // .bind(user_id)
+            // .bind(connection_id.0 as i32)
+            // .bind(user_id)
+            // .bind(connection_id.0 as i32)
+            // .execute(&mut tx)
+            // .await?;
+
+            // let room = self.get_room(room_id, &mut tx).await?;
+            // self.commit_room_transaction(room_id, tx, room).await
+        })
+        .await
+    }
 
-            project_collaborator::ActiveModel {
-                project_id: ActiveValue::set(project.id),
-                connection_id: ActiveValue::set(connection_id.0 as i32),
-                user_id: ActiveValue::set(participant.user_id),
-                replica_id: ActiveValue::set(0),
-                is_host: ActiveValue::set(true),
-                ..Default::default()
-            }
-            .insert(&tx)
-            .await?;
+    pub async fn call(
+        &self,
+        room_id: RoomId,
+        calling_user_id: UserId,
+        calling_connection_id: ConnectionId,
+        called_user_id: UserId,
+        initial_project_id: Option<ProjectId>,
+    ) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
+        self.transact(|tx| async move {
+            todo!()
+            // sqlx::query(
+            //     "
+            //     INSERT INTO room_participants (
+            //     room_id,
+            //     user_id,
+            //     calling_user_id,
+            //     calling_connection_id,
+            //     initial_project_id
+            //     )
+            //     VALUES ($1, $2, $3, $4, $5)
+            //     ",
+            // )
+            // .bind(room_id)
+            // .bind(called_user_id)
+            // .bind(calling_user_id)
+            // .bind(calling_connection_id.0 as i32)
+            // .bind(initial_project_id)
+            // .execute(&mut tx)
+            // .await?;
+
+            // let room = self.get_room(room_id, &mut tx).await?;
+            // let incoming_call = Self::build_incoming_call(&room, called_user_id)
+            //     .ok_or_else(|| anyhow!("failed to build incoming call"))?;
+            // self.commit_room_transaction(room_id, tx, (room, incoming_call))
+            // .await
+        })
+        .await
+    }
 
-            let room = self.get_room(room_id, &tx).await?;
-            self.commit_room_transaction(room_id, tx, (project.id, room))
-                .await
+    pub async fn call_failed(
+        &self,
+        room_id: RoomId,
+        called_user_id: UserId,
+    ) -> Result<RoomGuard<proto::Room>> {
+        self.transact(|tx| async move {
+            todo!()
+            // sqlx::query(
+            //     "
+            //     DELETE FROM room_participants
+            //     WHERE room_id = $1 AND user_id = $2
+            //     ",
+            // )
+            // .bind(room_id)
+            // .bind(called_user_id)
+            // .execute(&mut tx)
+            // .await?;
+
+            // let room = self.get_room(room_id, &mut tx).await?;
+            // self.commit_room_transaction(room_id, tx, room).await
         })
         .await
     }
 
+    pub async fn decline_call(
+        &self,
+        expected_room_id: Option<RoomId>,
+        user_id: UserId,
+    ) -> Result<RoomGuard<proto::Room>> {
+        self.transact(|tx| async move {
+            todo!()
+            // let room_id = sqlx::query_scalar(
+            //     "
+            //     DELETE FROM room_participants
+            //     WHERE user_id = $1 AND answering_connection_id IS NULL
+            //     RETURNING room_id
+            //     ",
+            // )
+            // .bind(user_id)
+            // .fetch_one(&mut tx)
+            // .await?;
+            // if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
+            //     return Err(anyhow!("declining call on unexpected room"))?;
+            // }
+
+            // let room = self.get_room(room_id, &mut tx).await?;
+            // self.commit_room_transaction(room_id, tx, room).await
+        })
+        .await
+    }
+
+    pub async fn cancel_call(
+        &self,
+        expected_room_id: Option<RoomId>,
+        calling_connection_id: ConnectionId,
+        called_user_id: UserId,
+    ) -> Result<RoomGuard<proto::Room>> {
+        self.transact(|tx| async move {
+            todo!()
+            // let room_id = sqlx::query_scalar(
+            //     "
+            //     DELETE FROM room_participants
+            //     WHERE user_id = $1 AND calling_connection_id = $2 AND answering_connection_id IS NULL
+            //     RETURNING room_id
+            //     ",
+            // )
+            // .bind(called_user_id)
+            // .bind(calling_connection_id.0 as i32)
+            // .fetch_one(&mut tx)
+            // .await?;
+            // if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) {
+            //     return Err(anyhow!("canceling call on unexpected room"))?;
+            // }
+
+            // let room = self.get_room(room_id, &mut tx).await?;
+            // self.commit_room_transaction(room_id, tx, room).await
+        })
+        .await
+    }
+
+    pub async fn join_room(
+        &self,
+        room_id: RoomId,
+        user_id: UserId,
+        connection_id: ConnectionId,
+    ) -> Result<RoomGuard<proto::Room>> {
+        self.transact(|tx| async move {
+            todo!()
+            // sqlx::query(
+            //     "
+            //     UPDATE room_participants
+            //     SET answering_connection_id = $1
+            //     WHERE room_id = $2 AND user_id = $3
+            //     RETURNING 1
+            //     ",
+            // )
+            // .bind(connection_id.0 as i32)
+            // .bind(room_id)
+            // .bind(user_id)
+            // .fetch_one(&mut tx)
+            // .await?;
+
+            // let room = self.get_room(room_id, &mut tx).await?;
+            // self.commit_room_transaction(room_id, tx, room).await
+        })
+        .await
+    }
+
+    pub async fn leave_room(
+        &self,
+        connection_id: ConnectionId,
+    ) -> Result<Option<RoomGuard<LeftRoom>>> {
+        self.transact(|tx| async move {
+            todo!()
+            // // Leave room.
+            // let room_id = sqlx::query_scalar::<_, RoomId>(
+            //     "
+            //     DELETE FROM room_participants
+            //     WHERE answering_connection_id = $1
+            //     RETURNING room_id
+            //     ",
+            // )
+            // .bind(connection_id.0 as i32)
+            // .fetch_optional(&mut tx)
+            // .await?;
+
+            // if let Some(room_id) = room_id {
+            //     // Cancel pending calls initiated by the leaving user.
+            //     let canceled_calls_to_user_ids: Vec<UserId> = sqlx::query_scalar(
+            //         "
+            //         DELETE FROM room_participants
+            //         WHERE calling_connection_id = $1 AND answering_connection_id IS NULL
+            //         RETURNING user_id
+            //         ",
+            //     )
+            //     .bind(connection_id.0 as i32)
+            //     .fetch_all(&mut tx)
+            //     .await?;
+
+            //     let project_ids = sqlx::query_scalar::<_, ProjectId>(
+            //         "
+            //         SELECT project_id
+            //         FROM project_collaborators
+            //         WHERE connection_id = $1
+            //         ",
+            //     )
+            //     .bind(connection_id.0 as i32)
+            //     .fetch_all(&mut tx)
+            //     .await?;
+
+            //     // Leave projects.
+            //     let mut left_projects = HashMap::default();
+            //     if !project_ids.is_empty() {
+            //         let mut params = "?,".repeat(project_ids.len());
+            //         params.pop();
+            //         let query = format!(
+            //             "
+            //             SELECT *
+            //             FROM project_collaborators
+            //             WHERE project_id IN ({params})
+            //             "
+            //         );
+            //         let mut query = sqlx::query_as::<_, ProjectCollaborator>(&query);
+            //         for project_id in project_ids {
+            //             query = query.bind(project_id);
+            //         }
+
+            //         let mut project_collaborators = query.fetch(&mut tx);
+            //         while let Some(collaborator) = project_collaborators.next().await {
+            //             let collaborator = collaborator?;
+            //             let left_project =
+            //                 left_projects
+            //                     .entry(collaborator.project_id)
+            //                     .or_insert(LeftProject {
+            //                         id: collaborator.project_id,
+            //                         host_user_id: Default::default(),
+            //                         connection_ids: Default::default(),
+            //                         host_connection_id: Default::default(),
+            //                     });
+
+            //             let collaborator_connection_id =
+            //                 ConnectionId(collaborator.connection_id as u32);
+            //             if collaborator_connection_id != connection_id {
+            //                 left_project.connection_ids.push(collaborator_connection_id);
+            //             }
+
+            //             if collaborator.is_host {
+            //                 left_project.host_user_id = collaborator.user_id;
+            //                 left_project.host_connection_id =
+            //                     ConnectionId(collaborator.connection_id as u32);
+            //             }
+            //         }
+            //     }
+            //     sqlx::query(
+            //         "
+            //         DELETE FROM project_collaborators
+            //         WHERE connection_id = $1
+            //         ",
+            //     )
+            //     .bind(connection_id.0 as i32)
+            //     .execute(&mut tx)
+            //     .await?;
+
+            //     // Unshare projects.
+            //     sqlx::query(
+            //         "
+            //         DELETE FROM projects
+            //         WHERE room_id = $1 AND host_connection_id = $2
+            //         ",
+            //     )
+            //     .bind(room_id)
+            //     .bind(connection_id.0 as i32)
+            //     .execute(&mut tx)
+            //     .await?;
+
+            //     let room = self.get_room(room_id, &mut tx).await?;
+            //     Ok(Some(
+            //         self.commit_room_transaction(
+            //             room_id,
+            //             tx,
+            //             LeftRoom {
+            //                 room,
+            //                 left_projects,
+            //                 canceled_calls_to_user_ids,
+            //             },
+            //         )
+            //         .await?,
+            //     ))
+            // } else {
+            //     Ok(None)
+            // }
+        })
+        .await
+    }
+
+    pub async fn update_room_participant_location(
+        &self,
+        room_id: RoomId,
+        connection_id: ConnectionId,
+        location: proto::ParticipantLocation,
+    ) -> Result<RoomGuard<proto::Room>> {
+        self.transact(|tx| async {
+            todo!()
+            // let mut tx = tx;
+            // let location_kind;
+            // let location_project_id;
+            // match location
+            //     .variant
+            //     .as_ref()
+            //     .ok_or_else(|| anyhow!("invalid location"))?
+            // {
+            //     proto::participant_location::Variant::SharedProject(project) => {
+            //         location_kind = 0;
+            //         location_project_id = Some(ProjectId::from_proto(project.id));
+            //     }
+            //     proto::participant_location::Variant::UnsharedProject(_) => {
+            //         location_kind = 1;
+            //         location_project_id = None;
+            //     }
+            //     proto::participant_location::Variant::External(_) => {
+            //         location_kind = 2;
+            //         location_project_id = None;
+            //     }
+            // }
+
+            // sqlx::query(
+            //     "
+            //     UPDATE room_participants
+            //     SET location_kind = $1, location_project_id = $2
+            //     WHERE room_id = $3 AND answering_connection_id = $4
+            //     RETURNING 1
+            //     ",
+            // )
+            // .bind(location_kind)
+            // .bind(location_project_id)
+            // .bind(room_id)
+            // .bind(connection_id.0 as i32)
+            // .fetch_one(&mut tx)
+            // .await?;
+
+            // let room = self.get_room(room_id, &mut tx).await?;
+            // self.commit_room_transaction(room_id, tx, room).await
+        })
+        .await
+    }
+
+    async fn get_guest_connection_ids(
+        &self,
+        project_id: ProjectId,
+        tx: &DatabaseTransaction,
+    ) -> Result<Vec<ConnectionId>> {
+        todo!()
+        // let mut guest_connection_ids = Vec::new();
+        // let mut db_guest_connection_ids = sqlx::query_scalar::<_, i32>(
+        //     "
+        //     SELECT connection_id
+        //     FROM project_collaborators
+        //     WHERE project_id = $1 AND is_host = FALSE
+        //     ",
+        // )
+        // .bind(project_id)
+        // .fetch(tx);
+        // while let Some(connection_id) = db_guest_connection_ids.next().await {
+        //     guest_connection_ids.push(ConnectionId(connection_id? as u32));
+        // }
+        // Ok(guest_connection_ids)
+    }
+
+    fn build_incoming_call(
+        room: &proto::Room,
+        called_user_id: UserId,
+    ) -> Option<proto::IncomingCall> {
+        let pending_participant = room
+            .pending_participants
+            .iter()
+            .find(|participant| participant.user_id == called_user_id.to_proto())?;
+
+        Some(proto::IncomingCall {
+            room_id: room.id,
+            calling_user_id: pending_participant.calling_user_id,
+            participant_user_ids: room
+                .participants
+                .iter()
+                .map(|participant| participant.user_id)
+                .collect(),
+            initial_project: room.participants.iter().find_map(|participant| {
+                let initial_project_id = pending_participant.initial_project_id?;
+                participant
+                    .projects
+                    .iter()
+                    .find(|project| project.id == initial_project_id)
+                    .cloned()
+            }),
+        })
+    }
+
     async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result<proto::Room> {
         let db_room = room::Entity::find_by_id(room_id)
             .one(tx)
@@ -1057,6 +1465,736 @@ impl Database {
         })
     }
 
+    // projects
+
+    pub async fn project_count_excluding_admins(&self) -> Result<usize> {
+        #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
+        enum QueryAs {
+            Count,
+        }
+
+        self.transact(|tx| async move {
+            Ok(project::Entity::find()
+                .select_only()
+                .column_as(project::Column::Id.count(), QueryAs::Count)
+                .inner_join(user::Entity)
+                .filter(user::Column::Admin.eq(false))
+                .into_values::<_, QueryAs>()
+                .one(&tx)
+                .await?
+                .unwrap_or(0) as usize)
+        })
+        .await
+    }
+
+    pub async fn share_project(
+        &self,
+        room_id: RoomId,
+        connection_id: ConnectionId,
+        worktrees: &[proto::WorktreeMetadata],
+    ) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
+        self.transact(|tx| async move {
+            let participant = room_participant::Entity::find()
+                .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0))
+                .one(&tx)
+                .await?
+                .ok_or_else(|| anyhow!("could not find participant"))?;
+            if participant.room_id != room_id {
+                return Err(anyhow!("shared project on unexpected room"))?;
+            }
+
+            let project = project::ActiveModel {
+                room_id: ActiveValue::set(participant.room_id),
+                host_user_id: ActiveValue::set(participant.user_id),
+                host_connection_id: ActiveValue::set(connection_id.0 as i32),
+                ..Default::default()
+            }
+            .insert(&tx)
+            .await?;
+
+            worktree::Entity::insert_many(worktrees.iter().map(|worktree| worktree::ActiveModel {
+                id: ActiveValue::set(worktree.id as i32),
+                project_id: ActiveValue::set(project.id),
+                abs_path: ActiveValue::set(worktree.abs_path.clone()),
+                root_name: ActiveValue::set(worktree.root_name.clone()),
+                visible: ActiveValue::set(worktree.visible),
+                scan_id: ActiveValue::set(0),
+                is_complete: ActiveValue::set(false),
+            }))
+            .exec(&tx)
+            .await?;
+
+            project_collaborator::ActiveModel {
+                project_id: ActiveValue::set(project.id),
+                connection_id: ActiveValue::set(connection_id.0 as i32),
+                user_id: ActiveValue::set(participant.user_id),
+                replica_id: ActiveValue::set(ReplicaId(0)),
+                is_host: ActiveValue::set(true),
+                ..Default::default()
+            }
+            .insert(&tx)
+            .await?;
+
+            let room = self.get_room(room_id, &tx).await?;
+            self.commit_room_transaction(room_id, tx, (project.id, room))
+                .await
+        })
+        .await
+    }
+
+    pub async fn unshare_project(
+        &self,
+        project_id: ProjectId,
+        connection_id: ConnectionId,
+    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
+        self.transact(|tx| async move {
+            todo!()
+            // let guest_connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
+            // let room_id: RoomId = sqlx::query_scalar(
+            //     "
+            //     DELETE FROM projects
+            //     WHERE id = $1 AND host_connection_id = $2
+            //     RETURNING room_id
+            //     ",
+            // )
+            // .bind(project_id)
+            // .bind(connection_id.0 as i32)
+            // .fetch_one(&mut tx)
+            // .await?;
+            // let room = self.get_room(room_id, &mut tx).await?;
+            // self.commit_room_transaction(room_id, tx, (room, guest_connection_ids))
+            //     .await
+        })
+        .await
+    }
+
+    pub async fn update_project(
+        &self,
+        project_id: ProjectId,
+        connection_id: ConnectionId,
+        worktrees: &[proto::WorktreeMetadata],
+    ) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
+        self.transact(|tx| async move {
+            todo!()
+            // let room_id: RoomId = sqlx::query_scalar(
+            //     "
+            //     SELECT room_id
+            //     FROM projects
+            //     WHERE id = $1 AND host_connection_id = $2
+            //     ",
+            // )
+            // .bind(project_id)
+            // .bind(connection_id.0 as i32)
+            // .fetch_one(&mut tx)
+            // .await?;
+
+            // if !worktrees.is_empty() {
+            //     let mut params = "(?, ?, ?, ?, ?, ?, ?),".repeat(worktrees.len());
+            //     params.pop();
+            //     let query = format!(
+            //         "
+            //         INSERT INTO worktrees (
+            //         project_id,
+            //         id,
+            //         root_name,
+            //         abs_path,
+            //         visible,
+            //         scan_id,
+            //         is_complete
+            //         )
+            //         VALUES {params}
+            //         ON CONFLICT (project_id, id) DO UPDATE SET root_name = excluded.root_name
+            //         "
+            //     );
+
+            //     let mut query = sqlx::query(&query);
+            //     for worktree in worktrees {
+            //         query = query
+            //             .bind(project_id)
+            //             .bind(worktree.id as i32)
+            //             .bind(&worktree.root_name)
+            //             .bind(&worktree.abs_path)
+            //             .bind(worktree.visible)
+            //             .bind(0)
+            //             .bind(false)
+            //     }
+            //     query.execute(&mut tx).await?;
+            // }
+
+            // let mut params = "?,".repeat(worktrees.len());
+            // if !worktrees.is_empty() {
+            //     params.pop();
+            // }
+            // let query = format!(
+            //     "
+            //     DELETE FROM worktrees
+            //     WHERE project_id = ? AND id NOT IN ({params})
+            //     ",
+            // );
+
+            // let mut query = sqlx::query(&query).bind(project_id);
+            // for worktree in worktrees {
+            //     query = query.bind(WorktreeId(worktree.id as i32));
+            // }
+            // query.execute(&mut tx).await?;
+
+            // let guest_connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
+            // let room = self.get_room(room_id, &mut tx).await?;
+            // self.commit_room_transaction(room_id, tx, (room, guest_connection_ids))
+            //     .await
+        })
+        .await
+    }
+
+    pub async fn update_worktree(
+        &self,
+        update: &proto::UpdateWorktree,
+        connection_id: ConnectionId,
+    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
+        self.transact(|tx| async move {
+            todo!()
+            // let project_id = ProjectId::from_proto(update.project_id);
+            // let worktree_id = WorktreeId::from_proto(update.worktree_id);
+
+            // // Ensure the update comes from the host.
+            // let room_id: RoomId = sqlx::query_scalar(
+            //     "
+            //     SELECT room_id
+            //     FROM projects
+            //     WHERE id = $1 AND host_connection_id = $2
+            //     ",
+            // )
+            // .bind(project_id)
+            // .bind(connection_id.0 as i32)
+            // .fetch_one(&mut tx)
+            // .await?;
+
+            // // Update metadata.
+            // sqlx::query(
+            //     "
+            //     UPDATE worktrees
+            //     SET
+            //     root_name = $1,
+            //     scan_id = $2,
+            //     is_complete = $3,
+            //     abs_path = $4
+            //     WHERE project_id = $5 AND id = $6
+            //     RETURNING 1
+            //     ",
+            // )
+            // .bind(&update.root_name)
+            // .bind(update.scan_id as i64)
+            // .bind(update.is_last_update)
+            // .bind(&update.abs_path)
+            // .bind(project_id)
+            // .bind(worktree_id)
+            // .fetch_one(&mut tx)
+            // .await?;
+
+            // if !update.updated_entries.is_empty() {
+            //     let mut params =
+            //         "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?),".repeat(update.updated_entries.len());
+            //     params.pop();
+
+            //     let query = format!(
+            //         "
+            //         INSERT INTO worktree_entries (
+            //         project_id,
+            //         worktree_id,
+            //         id,
+            //         is_dir,
+            //         path,
+            //         inode,
+            //         mtime_seconds,
+            //         mtime_nanos,
+            //         is_symlink,
+            //         is_ignored
+            //         )
+            //         VALUES {params}
+            //         ON CONFLICT (project_id, worktree_id, id) DO UPDATE SET
+            //         is_dir = excluded.is_dir,
+            //         path = excluded.path,
+            //         inode = excluded.inode,
+            //         mtime_seconds = excluded.mtime_seconds,
+            //         mtime_nanos = excluded.mtime_nanos,
+            //         is_symlink = excluded.is_symlink,
+            //         is_ignored = excluded.is_ignored
+            //         "
+            //     );
+            //     let mut query = sqlx::query(&query);
+            //     for entry in &update.updated_entries {
+            //         let mtime = entry.mtime.clone().unwrap_or_default();
+            //         query = query
+            //             .bind(project_id)
+            //             .bind(worktree_id)
+            //             .bind(entry.id as i64)
+            //             .bind(entry.is_dir)
+            //             .bind(&entry.path)
+            //             .bind(entry.inode as i64)
+            //             .bind(mtime.seconds as i64)
+            //             .bind(mtime.nanos as i32)
+            //             .bind(entry.is_symlink)
+            //             .bind(entry.is_ignored);
+            //     }
+            //     query.execute(&mut tx).await?;
+            // }
+
+            // if !update.removed_entries.is_empty() {
+            //     let mut params = "?,".repeat(update.removed_entries.len());
+            //     params.pop();
+            //     let query = format!(
+            //         "
+            //         DELETE FROM worktree_entries
+            //         WHERE project_id = ? AND worktree_id = ? AND id IN ({params})
+            //         "
+            //     );
+
+            //     let mut query = sqlx::query(&query).bind(project_id).bind(worktree_id);
+            //     for entry_id in &update.removed_entries {
+            //         query = query.bind(*entry_id as i64);
+            //     }
+            //     query.execute(&mut tx).await?;
+            // }
+
+            // let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
+            // self.commit_room_transaction(room_id, tx, connection_ids)
+            //     .await
+        })
+        .await
+    }
+
+    pub async fn update_diagnostic_summary(
+        &self,
+        update: &proto::UpdateDiagnosticSummary,
+        connection_id: ConnectionId,
+    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
+        self.transact(|tx| async {
+            todo!()
+            // let project_id = ProjectId::from_proto(update.project_id);
+            // let worktree_id = WorktreeId::from_proto(update.worktree_id);
+            // let summary = update
+            //     .summary
+            //     .as_ref()
+            //     .ok_or_else(|| anyhow!("invalid summary"))?;
+
+            // // Ensure the update comes from the host.
+            // let room_id: RoomId = sqlx::query_scalar(
+            //     "
+            //     SELECT room_id
+            //     FROM projects
+            //     WHERE id = $1 AND host_connection_id = $2
+            //     ",
+            // )
+            // .bind(project_id)
+            // .bind(connection_id.0 as i32)
+            // .fetch_one(&mut tx)
+            // .await?;
+
+            // // Update summary.
+            // sqlx::query(
+            //     "
+            //     INSERT INTO worktree_diagnostic_summaries (
+            //     project_id,
+            //     worktree_id,
+            //     path,
+            //     language_server_id,
+            //     error_count,
+            //     warning_count
+            //     )
+            //     VALUES ($1, $2, $3, $4, $5, $6)
+            //     ON CONFLICT (project_id, worktree_id, path) DO UPDATE SET
+            //     language_server_id = excluded.language_server_id,
+            //     error_count = excluded.error_count,
+            //     warning_count = excluded.warning_count
+            //     ",
+            // )
+            // .bind(project_id)
+            // .bind(worktree_id)
+            // .bind(&summary.path)
+            // .bind(summary.language_server_id as i64)
+            // .bind(summary.error_count as i32)
+            // .bind(summary.warning_count as i32)
+            // .execute(&mut tx)
+            // .await?;
+
+            // let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
+            // self.commit_room_transaction(room_id, tx, connection_ids)
+            //     .await
+        })
+        .await
+    }
+
+    pub async fn start_language_server(
+        &self,
+        update: &proto::StartLanguageServer,
+        connection_id: ConnectionId,
+    ) -> Result<RoomGuard<Vec<ConnectionId>>> {
+        self.transact(|tx| async {
+            todo!()
+            // let project_id = ProjectId::from_proto(update.project_id);
+            // let server = update
+            //     .server
+            //     .as_ref()
+            //     .ok_or_else(|| anyhow!("invalid language server"))?;
+
+            // // Ensure the update comes from the host.
+            // let room_id: RoomId = sqlx::query_scalar(
+            //     "
+            //     SELECT room_id
+            //     FROM projects
+            //     WHERE id = $1 AND host_connection_id = $2
+            //     ",
+            // )
+            // .bind(project_id)
+            // .bind(connection_id.0 as i32)
+            // .fetch_one(&mut tx)
+            // .await?;
+
+            // // Add the newly-started language server.
+            // sqlx::query(
+            //     "
+            //     INSERT INTO language_servers (project_id, id, name)
+            //     VALUES ($1, $2, $3)
+            //     ON CONFLICT (project_id, id) DO UPDATE SET
+            //     name = excluded.name
+            //     ",
+            // )
+            // .bind(project_id)
+            // .bind(server.id as i64)
+            // .bind(&server.name)
+            // .execute(&mut tx)
+            // .await?;
+
+            // let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?;
+            // self.commit_room_transaction(room_id, tx, connection_ids)
+            //     .await
+        })
+        .await
+    }
+
+    pub async fn join_project(
+        &self,
+        project_id: ProjectId,
+        connection_id: ConnectionId,
+    ) -> Result<RoomGuard<(Project, ReplicaId)>> {
+        self.transact(|tx| async move {
+            todo!()
+            // let (room_id, user_id) = sqlx::query_as::<_, (RoomId, UserId)>(
+            //     "
+            //     SELECT room_id, user_id
+            //     FROM room_participants
+            //     WHERE answering_connection_id = $1
+            //     ",
+            // )
+            // .bind(connection_id.0 as i32)
+            // .fetch_one(&mut tx)
+            // .await?;
+
+            // // Ensure project id was shared on this room.
+            // sqlx::query(
+            //     "
+            //     SELECT 1
+            //     FROM projects
+            //     WHERE id = $1 AND room_id = $2
+            //     ",
+            // )
+            // .bind(project_id)
+            // .bind(room_id)
+            // .fetch_one(&mut tx)
+            // .await?;
+
+            // let mut collaborators = sqlx::query_as::<_, ProjectCollaborator>(
+            //     "
+            //     SELECT *
+            //     FROM project_collaborators
+            //     WHERE project_id = $1
+            //     ",
+            // )
+            // .bind(project_id)
+            // .fetch_all(&mut tx)
+            // .await?;
+            // let replica_ids = collaborators
+            //     .iter()
+            //     .map(|c| c.replica_id)
+            //     .collect::<HashSet<_>>();
+            // let mut replica_id = ReplicaId(1);
+            // while replica_ids.contains(&replica_id) {
+            //     replica_id.0 += 1;
+            // }
+            // let new_collaborator = ProjectCollaborator {
+            //     project_id,
+            //     connection_id: connection_id.0 as i32,
+            //     user_id,
+            //     replica_id,
+            //     is_host: false,
+            // };
+
+            // sqlx::query(
+            //     "
+            //     INSERT INTO project_collaborators (
+            //     project_id,
+            //     connection_id,
+            //     user_id,
+            //     replica_id,
+            //     is_host
+            //     )
+            //     VALUES ($1, $2, $3, $4, $5)
+            //     ",
+            // )
+            // .bind(new_collaborator.project_id)
+            // .bind(new_collaborator.connection_id)
+            // .bind(new_collaborator.user_id)
+            // .bind(new_collaborator.replica_id)
+            // .bind(new_collaborator.is_host)
+            // .execute(&mut tx)
+            // .await?;
+            // collaborators.push(new_collaborator);
+
+            // let worktree_rows = sqlx::query_as::<_, WorktreeRow>(
+            //     "
+            //     SELECT *
+            //     FROM worktrees
+            //     WHERE project_id = $1
+            //     ",
+            // )
+            // .bind(project_id)
+            // .fetch_all(&mut tx)
+            // .await?;
+            // let mut worktrees = worktree_rows
+            //     .into_iter()
+            //     .map(|worktree_row| {
+            //         (
+            //             worktree_row.id,
+            //             Worktree {
+            //                 id: worktree_row.id,
+            //                 abs_path: worktree_row.abs_path,
+            //                 root_name: worktree_row.root_name,
+            //                 visible: worktree_row.visible,
+            //                 entries: Default::default(),
+            //                 diagnostic_summaries: Default::default(),
+            //                 scan_id: worktree_row.scan_id as u64,
+            //                 is_complete: worktree_row.is_complete,
+            //             },
+            //         )
+            //     })
+            //     .collect::<BTreeMap<_, _>>();
+
+            // // Populate worktree entries.
+            // {
+            //     let mut entries = sqlx::query_as::<_, WorktreeEntry>(
+            //         "
+            //         SELECT *
+            //         FROM worktree_entries
+            //         WHERE project_id = $1
+            //         ",
+            //     )
+            //     .bind(project_id)
+            //     .fetch(&mut tx);
+            //     while let Some(entry) = entries.next().await {
+            //         let entry = entry?;
+            //         if let Some(worktree) = worktrees.get_mut(&entry.worktree_id) {
+            //             worktree.entries.push(proto::Entry {
+            //                 id: entry.id as u64,
+            //                 is_dir: entry.is_dir,
+            //                 path: entry.path,
+            //                 inode: entry.inode as u64,
+            //                 mtime: Some(proto::Timestamp {
+            //                     seconds: entry.mtime_seconds as u64,
+            //                     nanos: entry.mtime_nanos as u32,
+            //                 }),
+            //                 is_symlink: entry.is_symlink,
+            //                 is_ignored: entry.is_ignored,
+            //             });
+            //         }
+            //     }
+            // }
+
+            // // Populate worktree diagnostic summaries.
+            // {
+            //     let mut summaries = sqlx::query_as::<_, WorktreeDiagnosticSummary>(
+            //         "
+            //         SELECT *
+            //         FROM worktree_diagnostic_summaries
+            //         WHERE project_id = $1
+            //         ",
+            //     )
+            //     .bind(project_id)
+            //     .fetch(&mut tx);
+            //     while let Some(summary) = summaries.next().await {
+            //         let summary = summary?;
+            //         if let Some(worktree) = worktrees.get_mut(&summary.worktree_id) {
+            //             worktree
+            //                 .diagnostic_summaries
+            //                 .push(proto::DiagnosticSummary {
+            //                     path: summary.path,
+            //                     language_server_id: summary.language_server_id as u64,
+            //                     error_count: summary.error_count as u32,
+            //                     warning_count: summary.warning_count as u32,
+            //                 });
+            //         }
+            //     }
+            // }
+
+            // // Populate language servers.
+            // let language_servers = sqlx::query_as::<_, LanguageServer>(
+            //     "
+            //     SELECT *
+            //     FROM language_servers
+            //     WHERE project_id = $1
+            //     ",
+            // )
+            // .bind(project_id)
+            // .fetch_all(&mut tx)
+            // .await?;
+
+            // self.commit_room_transaction(
+            //     room_id,
+            //     tx,
+            //     (
+            //         Project {
+            //             collaborators,
+            //             worktrees,
+            //             language_servers: language_servers
+            //                 .into_iter()
+            //                 .map(|language_server| proto::LanguageServer {
+            //                     id: language_server.id.to_proto(),
+            //                     name: language_server.name,
+            //                 })
+            //                 .collect(),
+            //         },
+            //         replica_id as ReplicaId,
+            //     ),
+            // )
+            // .await
+        })
+        .await
+    }
+
+    pub async fn leave_project(
+        &self,
+        project_id: ProjectId,
+        connection_id: ConnectionId,
+    ) -> Result<RoomGuard<LeftProject>> {
+        self.transact(|tx| async move {
+            todo!()
+            // let result = sqlx::query(
+            //     "
+            //     DELETE FROM project_collaborators
+            //     WHERE project_id = $1 AND connection_id = $2
+            //     ",
+            // )
+            // .bind(project_id)
+            // .bind(connection_id.0 as i32)
+            // .execute(&mut tx)
+            // .await?;
+
+            // if result.rows_affected() == 0 {
+            //     Err(anyhow!("not a collaborator on this project"))?;
+            // }
+
+            // let connection_ids = sqlx::query_scalar::<_, i32>(
+            //     "
+            //     SELECT connection_id
+            //     FROM project_collaborators
+            //     WHERE project_id = $1
+            //     ",
+            // )
+            // .bind(project_id)
+            // .fetch_all(&mut tx)
+            // .await?
+            // .into_iter()
+            // .map(|id| ConnectionId(id as u32))
+            // .collect();
+
+            // let (room_id, host_user_id, host_connection_id) =
+            //     sqlx::query_as::<_, (RoomId, i32, i32)>(
+            //         "
+            //         SELECT room_id, host_user_id, host_connection_id
+            //         FROM projects
+            //         WHERE id = $1
+            //         ",
+            //     )
+            //     .bind(project_id)
+            //     .fetch_one(&mut tx)
+            //     .await?;
+
+            // self.commit_room_transaction(
+            //     room_id,
+            //     tx,
+            //     LeftProject {
+            //         id: project_id,
+            //         host_user_id: UserId(host_user_id),
+            //         host_connection_id: ConnectionId(host_connection_id as u32),
+            //         connection_ids,
+            //     },
+            // )
+            // .await
+        })
+        .await
+    }
+
+    pub async fn project_collaborators(
+        &self,
+        project_id: ProjectId,
+        connection_id: ConnectionId,
+    ) -> Result<Vec<project_collaborator::Model>> {
+        self.transact(|tx| async move {
+            todo!()
+            // let collaborators = sqlx::query_as::<_, ProjectCollaborator>(
+            //     "
+            //     SELECT *
+            //     FROM project_collaborators
+            //     WHERE project_id = $1
+            //     ",
+            // )
+            // .bind(project_id)
+            // .fetch_all(&mut tx)
+            // .await?;
+
+            // if collaborators
+            //     .iter()
+            //     .any(|collaborator| collaborator.connection_id == connection_id.0 as i32)
+            // {
+            //     Ok(collaborators)
+            // } else {
+            //     Err(anyhow!("no such project"))?
+            // }
+        })
+        .await
+    }
+
+    pub async fn project_connection_ids(
+        &self,
+        project_id: ProjectId,
+        connection_id: ConnectionId,
+    ) -> Result<HashSet<ConnectionId>> {
+        self.transact(|tx| async move {
+            todo!()
+            // let connection_ids = sqlx::query_scalar::<_, i32>(
+            //     "
+            //     SELECT connection_id
+            //     FROM project_collaborators
+            //     WHERE project_id = $1
+            //     ",
+            // )
+            // .bind(project_id)
+            // .fetch_all(&mut tx)
+            // .await?;
+
+            // if connection_ids.contains(&(connection_id.0 as i32)) {
+            //     Ok(connection_ids
+            //         .into_iter()
+            //         .map(|connection_id| ConnectionId(connection_id as u32))
+            //         .collect())
+            // } else {
+            //     Err(anyhow!("no such project"))?
+            // }
+        })
+        .await
+    }
+
+    // access tokens
+
     pub async fn create_access_token_hash(
         &self,
         user_id: UserId,

crates/collab/src/db/project.rs 🔗

@@ -13,6 +13,12 @@ pub struct Model {
 
 #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
 pub enum Relation {
+    #[sea_orm(
+        belongs_to = "super::user::Entity",
+        from = "Column::HostUserId",
+        to = "super::user::Column::Id"
+    )]
+    HostUser,
     #[sea_orm(
         belongs_to = "super::room::Entity",
         from = "Column::RoomId",
@@ -23,6 +29,12 @@ pub enum Relation {
     Worktree,
 }
 
+impl Related<super::user::Entity> for Entity {
+    fn to() -> RelationDef {
+        Relation::HostUser.def()
+    }
+}
+
 impl Related<super::room::Entity> for Entity {
     fn to() -> RelationDef {
         Relation::Room.def()

crates/collab/src/db/project_collaborator.rs 🔗

@@ -1,4 +1,4 @@
-use super::{ProjectCollaboratorId, ProjectId, UserId};
+use super::{ProjectCollaboratorId, ProjectId, ReplicaId, UserId};
 use sea_orm::entity::prelude::*;
 
 #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)]
@@ -9,7 +9,7 @@ pub struct Model {
     pub project_id: ProjectId,
     pub connection_id: i32,
     pub user_id: UserId,
-    pub replica_id: i32,
+    pub replica_id: ReplicaId,
     pub is_host: bool,
 }
 

crates/collab/src/db/user.rs 🔗

@@ -24,6 +24,8 @@ pub enum Relation {
     AccessToken,
     #[sea_orm(has_one = "super::room_participant::Entity")]
     RoomParticipant,
+    #[sea_orm(has_many = "super::project::Entity")]
+    HostedProjects,
 }
 
 impl Related<super::access_token::Entity> for Entity {
@@ -38,4 +40,10 @@ impl Related<super::room_participant::Entity> for Entity {
     }
 }
 
+impl Related<super::project::Entity> for Entity {
+    fn to() -> RelationDef {
+        Relation::HostedProjects.def()
+    }
+}
+
 impl ActiveModelBehavior for ActiveModel {}