Move `Store::update_worktree` to `Db::update_worktree`

Antonio Scandurra created

Change summary

crates/collab/migrations.sqlite/20221109000000_test_schema.sql |  12 
crates/collab/src/db.rs                                        | 126 ++++
crates/collab/src/rpc.rs                                       |  17 
crates/collab/src/rpc/store.rs                                 |  51 -
4 files changed, 139 insertions(+), 67 deletions(-)

Detailed changes

crates/collab/migrations.sqlite/20221109000000_test_schema.sql 🔗

@@ -61,8 +61,8 @@ CREATE INDEX "index_worktrees_on_project_id" ON "worktrees" ("project_id");
 
 CREATE TABLE "worktree_entries" (
     "id" INTEGER NOT NULL,
-    "project_id" INTEGER NOT NULL REFERENCES projects (id),
-    "worktree_id" INTEGER NOT NULL REFERENCES worktrees (id),
+    "project_id" INTEGER NOT NULL,
+    "worktree_id" INTEGER NOT NULL,
     "is_dir" BOOL NOT NULL,
     "path" VARCHAR NOT NULL,
     "inode" INTEGER NOT NULL,
@@ -71,17 +71,19 @@ CREATE TABLE "worktree_entries" (
     "is_symlink" BOOL NOT NULL,
     "is_ignored" BOOL NOT NULL,
     PRIMARY KEY(project_id, worktree_id, id)
+    FOREIGN KEY(project_id, worktree_id) REFERENCES worktrees (project_id, id)
 );
 CREATE INDEX "index_worktree_entries_on_project_id_and_worktree_id" ON "worktree_entries" ("project_id", "worktree_id");
 
 CREATE TABLE "worktree_diagnostic_summaries" (
     "path" VARCHAR NOT NULL,
-    "project_id" INTEGER NOT NULL REFERENCES projects (id),
-    "worktree_id" INTEGER NOT NULL REFERENCES worktrees (id),
+    "project_id" INTEGER NOT NULL,
+    "worktree_id" INTEGER NOT NULL,
     "language_server_id" INTEGER NOT NULL,
     "error_count" INTEGER NOT NULL,
     "warning_count" INTEGER NOT NULL,
-    PRIMARY KEY(project_id, worktree_id, path)
+    PRIMARY KEY(project_id, worktree_id, path),
+    FOREIGN KEY(project_id, worktree_id) REFERENCES worktrees (project_id, id)
 );
 CREATE INDEX "index_worktree_diagnostic_summaries_on_project_id_and_worktree_id" ON "worktree_diagnostic_summaries" ("project_id", "worktree_id");
 

crates/collab/src/db.rs 🔗

@@ -1556,6 +1556,132 @@ where
         .await
     }
 
+    pub async fn update_worktree(
+        &self,
+        update: &proto::UpdateWorktree,
+        connection_id: ConnectionId,
+    ) -> Result<Vec<ConnectionId>> {
+        self.transact(|mut tx| async move {
+            let project_id = ProjectId::from_proto(update.project_id);
+            let worktree_id = WorktreeId::from_proto(update.worktree_id);
+
+            // Ensure the update comes from the host.
+            sqlx::query(
+                "
+                SELECT 1
+                FROM projects
+                WHERE id = $1 AND host_connection_id = $2
+                ",
+            )
+            .bind(project_id)
+            .bind(connection_id.0 as i32)
+            .fetch_one(&mut tx)
+            .await?;
+
+            // Update metadata.
+            sqlx::query(
+                "
+                UPDATE worktrees
+                SET
+                    root_name = $1,
+                    scan_id = $2,
+                    is_complete = $3,
+                    abs_path = $4
+                WHERE project_id = $5 AND id = $6
+                RETURNING 1
+                ",
+            )
+            .bind(&update.root_name)
+            .bind(update.scan_id as i64)
+            .bind(update.is_last_update)
+            .bind(&update.abs_path)
+            .bind(project_id)
+            .bind(worktree_id)
+            .fetch_one(&mut tx)
+            .await?;
+
+            if !update.updated_entries.is_empty() {
+                let mut params =
+                    "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?),".repeat(update.updated_entries.len());
+                params.pop();
+
+                let query = format!(
+                    "
+                    INSERT INTO worktree_entries (
+                        project_id, 
+                        worktree_id, 
+                        id, 
+                        is_dir, 
+                        path, 
+                        inode,
+                        mtime_seconds, 
+                        mtime_nanos, 
+                        is_symlink, 
+                        is_ignored
+                    )
+                    VALUES {params}
+                    "
+                );
+                let mut query = sqlx::query(&query);
+                for entry in &update.updated_entries {
+                    let mtime = entry.mtime.clone().unwrap_or_default();
+                    query = query
+                        .bind(project_id)
+                        .bind(worktree_id)
+                        .bind(entry.id as i64)
+                        .bind(entry.is_dir)
+                        .bind(&entry.path)
+                        .bind(entry.inode as i64)
+                        .bind(mtime.seconds as i64)
+                        .bind(mtime.nanos as i32)
+                        .bind(entry.is_symlink)
+                        .bind(entry.is_ignored);
+                }
+                query.execute(&mut tx).await?;
+            }
+
+            if !update.removed_entries.is_empty() {
+                let mut params = "(?, ?, ?),".repeat(update.removed_entries.len());
+                params.pop();
+                let query = format!(
+                    "
+                    DELETE FROM worktree_entries
+                    WHERE (project_id, worktree_id, entry_id) IN ({params})
+                    "
+                );
+
+                let mut query = sqlx::query(&query);
+                for entry_id in &update.removed_entries {
+                    query = query
+                        .bind(project_id)
+                        .bind(worktree_id)
+                        .bind(*entry_id as i64);
+                }
+                query.execute(&mut tx).await?;
+            }
+
+            let connection_ids = sqlx::query_scalar::<_, i32>(
+                "
+                SELECT connection_id
+                FROM project_collaborators
+                WHERE project_id = $1 AND connection_id != $2
+                ",
+            )
+            .bind(project_id)
+            .bind(connection_id.0 as i32)
+            .fetch_all(&mut tx)
+            .await?;
+
+            tx.commit().await?;
+
+            Ok(connection_ids
+                .into_iter()
+                .map(|connection_id| ConnectionId(connection_id as u32))
+                .collect())
+        })
+        .await
+    }
+
     pub async fn join_project(
         &self,
         project_id: ProjectId,

crates/collab/src/rpc.rs 🔗

@@ -1105,18 +1105,11 @@ impl Server {
         request: Message<proto::UpdateWorktree>,
         response: Response<proto::UpdateWorktree>,
     ) -> Result<()> {
-        let project_id = ProjectId::from_proto(request.payload.project_id);
-        let worktree_id = request.payload.worktree_id;
-        let connection_ids = self.store().await.update_worktree(
-            request.sender_connection_id,
-            project_id,
-            worktree_id,
-            &request.payload.root_name,
-            &request.payload.removed_entries,
-            &request.payload.updated_entries,
-            request.payload.scan_id,
-            request.payload.is_last_update,
-        )?;
+        let connection_ids = self
+            .app_state
+            .db
+            .update_worktree(&request.payload, request.sender_connection_id)
+            .await?;
 
         broadcast(
             request.sender_connection_id,

crates/collab/src/rpc/store.rs 🔗

@@ -3,7 +3,7 @@ use anyhow::{anyhow, Result};
 use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet};
 use rpc::{proto, ConnectionId};
 use serde::Serialize;
-use std::{path::PathBuf, str};
+use std::path::PathBuf;
 use tracing::instrument;
 
 pub type RoomId = u64;
@@ -325,37 +325,6 @@ impl Store {
         })
     }
 
-    #[allow(clippy::too_many_arguments)]
-    pub fn update_worktree(
-        &mut self,
-        connection_id: ConnectionId,
-        project_id: ProjectId,
-        worktree_id: u64,
-        worktree_root_name: &str,
-        removed_entries: &[u64],
-        updated_entries: &[proto::Entry],
-        scan_id: u64,
-        is_last_update: bool,
-    ) -> Result<Vec<ConnectionId>> {
-        let project = self.write_project(project_id, connection_id)?;
-
-        let connection_ids = project.connection_ids();
-        let mut worktree = project.worktrees.entry(worktree_id).or_default();
-        worktree.root_name = worktree_root_name.to_string();
-
-        for entry_id in removed_entries {
-            worktree.entries.remove(entry_id);
-        }
-
-        for entry in updated_entries {
-            worktree.entries.insert(entry.id, entry.clone());
-        }
-
-        worktree.scan_id = scan_id;
-        worktree.is_complete = is_last_update;
-        Ok(connection_ids)
-    }
-
     pub fn project_connection_ids(
         &self,
         project_id: ProjectId,
@@ -384,24 +353,6 @@ impl Store {
         }
     }
 
-    fn write_project(
-        &mut self,
-        project_id: ProjectId,
-        connection_id: ConnectionId,
-    ) -> Result<&mut Project> {
-        let project = self
-            .projects
-            .get_mut(&project_id)
-            .ok_or_else(|| anyhow!("no such project"))?;
-        if project.host_connection_id == connection_id
-            || project.guests.contains_key(&connection_id)
-        {
-            Ok(project)
-        } else {
-            Err(anyhow!("no such project"))?
-        }
-    }
-
     #[cfg(test)]
     pub fn check_invariants(&self) {
         for (connection_id, connection) in &self.connections {