From bc1c0a22974e13f8ef75d0db1ca93e7c51077237 Mon Sep 17 00:00:00 2001 From: Cole Miller Date: Thu, 20 Mar 2025 18:07:03 -0400 Subject: [PATCH] Separate repository state synchronization from worktree synchronization (#27140) This PR updates our DB schemas and wire protocol to separate the synchronization of git statuses and other repository state from the synchronization of worktrees. This paves the way for moving the code that executes git status updates out of the `worktree` crate and onto the new `GitStore`. That end goal is motivated by two (related) points: - Disentangling git status updates from the worktree's `BackgroundScanner` will allow us to implement a simpler concurrency story for those updates, hopefully fixing some known but elusive bugs (upstream state not updating after push; statuses getting out of sync in remote projects). - By moving git repository state to the project-scoped `GitStore`, we can get rid of the duplication that currently happens when two worktrees are associated with the same git repository. Co-authored-by: Max Release Notes: - N/A --------- Co-authored-by: Max Co-authored-by: Max Brunsfeld --- crates/call/src/cross_platform/room.rs | 27 +- crates/call/src/macos/room.rs | 27 +- .../20221109000000_test_schema.sql | 144 ++++-- ...0319182812_create_project_repositories.sql | 32 ++ crates/collab/src/db.rs | 9 +- crates/collab/src/db/queries/projects.rs | 462 ++++++++++++------ crates/collab/src/db/queries/rooms.rs | 177 ++++--- crates/collab/src/db/tables.rs | 4 +- crates/collab/src/db/tables/project.rs | 8 + ...ee_repository.rs => project_repository.rs} | 26 +- ...uses.rs => project_repository_statuses.rs} | 6 +- crates/collab/src/rpc.rs | 79 ++- crates/collab/src/tests/integration_tests.rs | 2 +- crates/project/src/connection_manager.rs | 27 +- crates/project/src/project.rs | 59 ++- crates/project/src/worktree_store.rs | 42 +- crates/proto/proto/zed.proto | 35 +- crates/proto/src/proto.rs | 73 ++- .../remote_server/src/remote_editing_tests.rs | 6 +- crates/worktree/src/worktree.rs | 418 +++++++++------- crates/worktree/src/worktree_tests.rs | 15 +- 21 files changed, 1145 insertions(+), 533 deletions(-) create mode 100644 crates/collab/migrations/20250319182812_create_project_repositories.sql rename crates/collab/src/db/tables/{worktree_repository.rs => project_repository.rs} (51%) rename crates/collab/src/db/tables/{worktree_repository_statuses.rs => project_repository_statuses.rs} (88%) diff --git a/crates/call/src/cross_platform/room.rs b/crates/call/src/cross_platform/room.rs index a06460094afb8edeede003a82a2a3738c4338fcd..ebed7439cc017248a3a8168e5e8bb06f566870f3 100644 --- a/crates/call/src/cross_platform/room.rs +++ b/crates/call/src/cross_platform/room.rs @@ -469,18 +469,25 @@ impl Room { let project = handle.read(cx); if let Some(project_id) = project.remote_id() { projects.insert(project_id, handle.clone()); + let mut worktrees = Vec::new(); + let mut repositories = Vec::new(); + for worktree in project.worktrees(cx) { + let worktree = worktree.read(cx); + worktrees.push(proto::RejoinWorktree { + id: worktree.id().to_proto(), + scan_id: worktree.completed_scan_id() as u64, + }); + for repository in worktree.repositories().iter() { + repositories.push(proto::RejoinRepository { + id: repository.work_directory_id().to_proto(), + scan_id: worktree.completed_scan_id() as u64, + }); + } + } rejoined_projects.push(proto::RejoinProject { id: project_id, - worktrees: project - .worktrees(cx) - .map(|worktree| { - let worktree = worktree.read(cx); - proto::RejoinWorktree { - id: worktree.id().to_proto(), - scan_id: worktree.completed_scan_id() as u64, - } - }) - .collect(), + worktrees, + repositories, }); } return true; diff --git a/crates/call/src/macos/room.rs b/crates/call/src/macos/room.rs index 0fa916e1c8a9419d4dd5850ce3fe06e4de84d846..de5d4b927bb6608fdc505421748e4b7a84c46167 100644 --- a/crates/call/src/macos/room.rs +++ b/crates/call/src/macos/room.rs @@ -524,18 +524,25 @@ impl Room { let project = handle.read(cx); if let Some(project_id) = project.remote_id() { projects.insert(project_id, handle.clone()); + let mut worktrees = Vec::new(); + let mut repositories = Vec::new(); + for worktree in project.worktrees(cx) { + let worktree = worktree.read(cx); + worktrees.push(proto::RejoinWorktree { + id: worktree.id().to_proto(), + scan_id: worktree.completed_scan_id() as u64, + }); + for repository in worktree.repositories().iter() { + repositories.push(proto::RejoinRepository { + id: repository.work_directory_id().to_proto(), + scan_id: worktree.completed_scan_id() as u64, + }); + } + } rejoined_projects.push(proto::RejoinProject { id: project_id, - worktrees: project - .worktrees(cx) - .map(|worktree| { - let worktree = worktree.read(cx); - proto::RejoinWorktree { - id: worktree.id().to_proto(), - scan_id: worktree.completed_scan_id() as u64, - } - }) - .collect(), + worktrees, + repositories, }); } return true; diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index 750a21818c3e28a18c75a7270880e2654e4c339f..82dd19dec53001435af60e42ccefd4499ed20f80 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -15,9 +15,13 @@ CREATE TABLE "users" ( "github_user_created_at" TIMESTAMP WITHOUT TIME ZONE, "custom_llm_monthly_allowance_in_cents" INTEGER ); + CREATE UNIQUE INDEX "index_users_github_login" ON "users" ("github_login"); + CREATE UNIQUE INDEX "index_invite_code_users" ON "users" ("invite_code"); + CREATE INDEX "index_users_on_email_address" ON "users" ("email_address"); + CREATE UNIQUE INDEX "index_users_on_github_user_id" ON "users" ("github_user_id"); CREATE TABLE "access_tokens" ( @@ -26,6 +30,7 @@ CREATE TABLE "access_tokens" ( "impersonated_user_id" INTEGER REFERENCES users (id), "hash" VARCHAR(128) ); + CREATE INDEX "index_access_tokens_user_id" ON "access_tokens" ("user_id"); CREATE TABLE "contacts" ( @@ -36,7 +41,9 @@ CREATE TABLE "contacts" ( "should_notify" BOOLEAN NOT NULL, "accepted" BOOLEAN NOT NULL ); + CREATE UNIQUE INDEX "index_contacts_user_ids" ON "contacts" ("user_id_a", "user_id_b"); + CREATE INDEX "index_contacts_user_id_b" ON "contacts" ("user_id_b"); CREATE TABLE "rooms" ( @@ -45,6 +52,7 @@ CREATE TABLE "rooms" ( "environment" VARCHAR, "channel_id" INTEGER REFERENCES channels (id) ON DELETE CASCADE ); + CREATE UNIQUE INDEX "index_rooms_on_channel_id" ON "rooms" ("channel_id"); CREATE TABLE "projects" ( @@ -55,7 +63,9 @@ CREATE TABLE "projects" ( "host_connection_server_id" INTEGER REFERENCES servers (id) ON DELETE CASCADE, "unregistered" BOOLEAN NOT NULL DEFAULT FALSE ); + CREATE INDEX "index_projects_on_host_connection_server_id" ON "projects" ("host_connection_server_id"); + CREATE INDEX "index_projects_on_host_connection_id_and_host_connection_server_id" ON "projects" ("host_connection_id", "host_connection_server_id"); CREATE TABLE "worktrees" ( @@ -67,8 +77,9 @@ CREATE TABLE "worktrees" ( "scan_id" INTEGER NOT NULL, "is_complete" BOOL NOT NULL DEFAULT FALSE, "completed_scan_id" INTEGER NOT NULL, - PRIMARY KEY(project_id, id) + PRIMARY KEY (project_id, id) ); + CREATE INDEX "index_worktrees_on_project_id" ON "worktrees" ("project_id"); CREATE TABLE "worktree_entries" ( @@ -87,32 +98,33 @@ CREATE TABLE "worktree_entries" ( "is_deleted" BOOL NOT NULL, "git_status" INTEGER, "is_fifo" BOOL NOT NULL, - PRIMARY KEY(project_id, worktree_id, id), - FOREIGN KEY(project_id, worktree_id) REFERENCES worktrees (project_id, id) ON DELETE CASCADE + PRIMARY KEY (project_id, worktree_id, id), + FOREIGN KEY (project_id, worktree_id) REFERENCES worktrees (project_id, id) ON DELETE CASCADE ); + CREATE INDEX "index_worktree_entries_on_project_id" ON "worktree_entries" ("project_id"); + CREATE INDEX "index_worktree_entries_on_project_id_and_worktree_id" ON "worktree_entries" ("project_id", "worktree_id"); -CREATE TABLE "worktree_repositories" ( +CREATE TABLE "project_repositories" ( "project_id" INTEGER NOT NULL, - "worktree_id" INTEGER NOT NULL, - "work_directory_id" INTEGER NOT NULL, + "abs_path" VARCHAR, + "id" INTEGER NOT NULL, + "entry_ids" VARCHAR, + "legacy_worktree_id" INTEGER, "branch" VARCHAR, "scan_id" INTEGER NOT NULL, "is_deleted" BOOL NOT NULL, "current_merge_conflicts" VARCHAR, "branch_summary" VARCHAR, - PRIMARY KEY(project_id, worktree_id, work_directory_id), - FOREIGN KEY(project_id, worktree_id) REFERENCES worktrees (project_id, id) ON DELETE CASCADE, - FOREIGN KEY(project_id, worktree_id, work_directory_id) REFERENCES worktree_entries (project_id, worktree_id, id) ON DELETE CASCADE + PRIMARY KEY (project_id, id) ); -CREATE INDEX "index_worktree_repositories_on_project_id" ON "worktree_repositories" ("project_id"); -CREATE INDEX "index_worktree_repositories_on_project_id_and_worktree_id" ON "worktree_repositories" ("project_id", "worktree_id"); -CREATE TABLE "worktree_repository_statuses" ( +CREATE INDEX "index_project_repositories_on_project_id" ON "project_repositories" ("project_id"); + +CREATE TABLE "project_repository_statuses" ( "project_id" INTEGER NOT NULL, - "worktree_id" INT8 NOT NULL, - "work_directory_id" INT8 NOT NULL, + "repository_id" INTEGER NOT NULL, "repo_path" VARCHAR NOT NULL, "status" INT8 NOT NULL, "status_kind" INT4 NOT NULL, @@ -120,13 +132,12 @@ CREATE TABLE "worktree_repository_statuses" ( "second_status" INT4 NULL, "scan_id" INT8 NOT NULL, "is_deleted" BOOL NOT NULL, - PRIMARY KEY(project_id, worktree_id, work_directory_id, repo_path), - FOREIGN KEY(project_id, worktree_id) REFERENCES worktrees (project_id, id) ON DELETE CASCADE, - FOREIGN KEY(project_id, worktree_id, work_directory_id) REFERENCES worktree_entries (project_id, worktree_id, id) ON DELETE CASCADE + PRIMARY KEY (project_id, repository_id, repo_path) ); -CREATE INDEX "index_wt_repos_statuses_on_project_id" ON "worktree_repository_statuses" ("project_id"); -CREATE INDEX "index_wt_repos_statuses_on_project_id_and_wt_id" ON "worktree_repository_statuses" ("project_id", "worktree_id"); -CREATE INDEX "index_wt_repos_statuses_on_project_id_and_wt_id_and_wd_id" ON "worktree_repository_statuses" ("project_id", "worktree_id", "work_directory_id"); + +CREATE INDEX "index_project_repos_statuses_on_project_id" ON "project_repository_statuses" ("project_id"); + +CREATE INDEX "index_project_repos_statuses_on_project_id_and_repo_id" ON "project_repository_statuses" ("project_id", "repository_id"); CREATE TABLE "worktree_settings_files" ( "project_id" INTEGER NOT NULL, @@ -134,10 +145,12 @@ CREATE TABLE "worktree_settings_files" ( "path" VARCHAR NOT NULL, "content" TEXT, "kind" VARCHAR, - PRIMARY KEY(project_id, worktree_id, path), - FOREIGN KEY(project_id, worktree_id) REFERENCES worktrees (project_id, id) ON DELETE CASCADE + PRIMARY KEY (project_id, worktree_id, path), + FOREIGN KEY (project_id, worktree_id) REFERENCES worktrees (project_id, id) ON DELETE CASCADE ); + CREATE INDEX "index_worktree_settings_files_on_project_id" ON "worktree_settings_files" ("project_id"); + CREATE INDEX "index_worktree_settings_files_on_project_id_and_worktree_id" ON "worktree_settings_files" ("project_id", "worktree_id"); CREATE TABLE "worktree_diagnostic_summaries" ( @@ -147,18 +160,21 @@ CREATE TABLE "worktree_diagnostic_summaries" ( "language_server_id" INTEGER NOT NULL, "error_count" INTEGER NOT NULL, "warning_count" INTEGER NOT NULL, - PRIMARY KEY(project_id, worktree_id, path), - FOREIGN KEY(project_id, worktree_id) REFERENCES worktrees (project_id, id) ON DELETE CASCADE + PRIMARY KEY (project_id, worktree_id, path), + FOREIGN KEY (project_id, worktree_id) REFERENCES worktrees (project_id, id) ON DELETE CASCADE ); + CREATE INDEX "index_worktree_diagnostic_summaries_on_project_id" ON "worktree_diagnostic_summaries" ("project_id"); + CREATE INDEX "index_worktree_diagnostic_summaries_on_project_id_and_worktree_id" ON "worktree_diagnostic_summaries" ("project_id", "worktree_id"); CREATE TABLE "language_servers" ( "id" INTEGER NOT NULL, "project_id" INTEGER NOT NULL REFERENCES projects (id) ON DELETE CASCADE, "name" VARCHAR NOT NULL, - PRIMARY KEY(project_id, id) + PRIMARY KEY (project_id, id) ); + CREATE INDEX "index_language_servers_on_project_id" ON "language_servers" ("project_id"); CREATE TABLE "project_collaborators" ( @@ -170,11 +186,20 @@ CREATE TABLE "project_collaborators" ( "replica_id" INTEGER NOT NULL, "is_host" BOOLEAN NOT NULL ); + CREATE INDEX "index_project_collaborators_on_project_id" ON "project_collaborators" ("project_id"); + CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_and_replica_id" ON "project_collaborators" ("project_id", "replica_id"); + CREATE INDEX "index_project_collaborators_on_connection_server_id" ON "project_collaborators" ("connection_server_id"); + CREATE INDEX "index_project_collaborators_on_connection_id" ON "project_collaborators" ("connection_id"); -CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_connection_id_and_server_id" ON "project_collaborators" ("project_id", "connection_id", "connection_server_id"); + +CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_connection_id_and_server_id" ON "project_collaborators" ( + "project_id", + "connection_id", + "connection_server_id" +); CREATE TABLE "room_participants" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, @@ -193,12 +218,21 @@ CREATE TABLE "room_participants" ( "role" TEXT, "in_call" BOOLEAN NOT NULL DEFAULT FALSE ); + CREATE UNIQUE INDEX "index_room_participants_on_user_id" ON "room_participants" ("user_id"); + CREATE INDEX "index_room_participants_on_room_id" ON "room_participants" ("room_id"); + CREATE INDEX "index_room_participants_on_answering_connection_server_id" ON "room_participants" ("answering_connection_server_id"); + CREATE INDEX "index_room_participants_on_calling_connection_server_id" ON "room_participants" ("calling_connection_server_id"); + CREATE INDEX "index_room_participants_on_answering_connection_id" ON "room_participants" ("answering_connection_id"); -CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_server_id" ON "room_participants" ("answering_connection_id", "answering_connection_server_id"); + +CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_server_id" ON "room_participants" ( + "answering_connection_id", + "answering_connection_server_id" +); CREATE TABLE "servers" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, @@ -214,9 +248,15 @@ CREATE TABLE "followers" ( "follower_connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE, "follower_connection_id" INTEGER NOT NULL ); -CREATE UNIQUE INDEX - "index_followers_on_project_id_and_leader_connection_server_id_and_leader_connection_id_and_follower_connection_server_id_and_follower_connection_id" -ON "followers" ("project_id", "leader_connection_server_id", "leader_connection_id", "follower_connection_server_id", "follower_connection_id"); + +CREATE UNIQUE INDEX "index_followers_on_project_id_and_leader_connection_server_id_and_leader_connection_id_and_follower_connection_server_id_and_follower_connection_id" ON "followers" ( + "project_id", + "leader_connection_server_id", + "leader_connection_id", + "follower_connection_server_id", + "follower_connection_id" +); + CREATE INDEX "index_followers_on_room_id" ON "followers" ("room_id"); CREATE TABLE "channels" ( @@ -237,6 +277,7 @@ CREATE TABLE IF NOT EXISTS "channel_chat_participants" ( "connection_id" INTEGER NOT NULL, "connection_server_id" INTEGER NOT NULL REFERENCES servers (id) ON DELETE CASCADE ); + CREATE INDEX "index_channel_chat_participants_on_channel_id" ON "channel_chat_participants" ("channel_id"); CREATE TABLE IF NOT EXISTS "channel_messages" ( @@ -249,7 +290,9 @@ CREATE TABLE IF NOT EXISTS "channel_messages" ( "nonce" BLOB NOT NULL, "reply_to_message_id" INTEGER DEFAULT NULL ); + CREATE INDEX "index_channel_messages_on_channel_id" ON "channel_messages" ("channel_id"); + CREATE UNIQUE INDEX "index_channel_messages_on_sender_id_nonce" ON "channel_messages" ("sender_id", "nonce"); CREATE TABLE "channel_message_mentions" ( @@ -257,7 +300,7 @@ CREATE TABLE "channel_message_mentions" ( "start_offset" INTEGER NOT NULL, "end_offset" INTEGER NOT NULL, "user_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE, - PRIMARY KEY(message_id, start_offset) + PRIMARY KEY (message_id, start_offset) ); CREATE TABLE "channel_members" ( @@ -288,7 +331,7 @@ CREATE TABLE "buffer_operations" ( "replica_id" INTEGER NOT NULL, "lamport_timestamp" INTEGER NOT NULL, "value" BLOB NOT NULL, - PRIMARY KEY(buffer_id, epoch, lamport_timestamp, replica_id) + PRIMARY KEY (buffer_id, epoch, lamport_timestamp, replica_id) ); CREATE TABLE "buffer_snapshots" ( @@ -296,7 +339,7 @@ CREATE TABLE "buffer_snapshots" ( "epoch" INTEGER NOT NULL, "text" TEXT NOT NULL, "operation_serialization_version" INTEGER NOT NULL, - PRIMARY KEY(buffer_id, epoch) + PRIMARY KEY (buffer_id, epoch) ); CREATE TABLE "channel_buffer_collaborators" ( @@ -310,11 +353,18 @@ CREATE TABLE "channel_buffer_collaborators" ( ); CREATE INDEX "index_channel_buffer_collaborators_on_channel_id" ON "channel_buffer_collaborators" ("channel_id"); + CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_channel_id_and_replica_id" ON "channel_buffer_collaborators" ("channel_id", "replica_id"); + CREATE INDEX "index_channel_buffer_collaborators_on_connection_server_id" ON "channel_buffer_collaborators" ("connection_server_id"); + CREATE INDEX "index_channel_buffer_collaborators_on_connection_id" ON "channel_buffer_collaborators" ("connection_id"); -CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_channel_id_connection_id_and_server_id" ON "channel_buffer_collaborators" ("channel_id", "connection_id", "connection_server_id"); +CREATE UNIQUE INDEX "index_channel_buffer_collaborators_on_channel_id_connection_id_and_server_id" ON "channel_buffer_collaborators" ( + "channel_id", + "connection_id", + "connection_server_id" +); CREATE TABLE "feature_flags" ( "id" INTEGER PRIMARY KEY AUTOINCREMENT, @@ -324,7 +374,6 @@ CREATE TABLE "feature_flags" ( CREATE INDEX "index_feature_flags" ON "feature_flags" ("id"); - CREATE TABLE "user_features" ( "user_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE, "feature_id" INTEGER NOT NULL REFERENCES feature_flags (id) ON DELETE CASCADE, @@ -332,9 +381,10 @@ CREATE TABLE "user_features" ( ); CREATE UNIQUE INDEX "index_user_features_user_id_and_feature_id" ON "user_features" ("user_id", "feature_id"); + CREATE INDEX "index_user_features_on_user_id" ON "user_features" ("user_id"); -CREATE INDEX "index_user_features_on_feature_id" ON "user_features" ("feature_id"); +CREATE INDEX "index_user_features_on_feature_id" ON "user_features" ("feature_id"); CREATE TABLE "observed_buffer_edits" ( "user_id" INTEGER NOT NULL REFERENCES users (id) ON DELETE CASCADE, @@ -374,13 +424,10 @@ CREATE TABLE "notifications" ( "response" BOOLEAN ); -CREATE INDEX - "index_notifications_on_recipient_id_is_read_kind_entity_id" - ON "notifications" - ("recipient_id", "is_read", "kind", "entity_id"); +CREATE INDEX "index_notifications_on_recipient_id_is_read_kind_entity_id" ON "notifications" ("recipient_id", "is_read", "kind", "entity_id"); CREATE TABLE contributors ( - user_id INTEGER REFERENCES users(id), + user_id INTEGER REFERENCES users (id), signed_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (user_id) ); @@ -394,7 +441,7 @@ CREATE TABLE extensions ( ); CREATE TABLE extension_versions ( - extension_id INTEGER REFERENCES extensions(id), + extension_id INTEGER REFERENCES extensions (id), version TEXT NOT NULL, published_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, authors TEXT NOT NULL, @@ -416,6 +463,7 @@ CREATE TABLE extension_versions ( ); CREATE UNIQUE INDEX "index_extensions_external_id" ON "extensions" ("external_id"); + CREATE INDEX "index_extensions_total_download_count" ON "extensions" ("total_download_count"); CREATE TABLE rate_buckets ( @@ -424,14 +472,15 @@ CREATE TABLE rate_buckets ( token_count INT NOT NULL, last_refill TIMESTAMP WITHOUT TIME ZONE NOT NULL, PRIMARY KEY (user_id, rate_limit_name), - FOREIGN KEY (user_id) REFERENCES users(id) + FOREIGN KEY (user_id) REFERENCES users (id) ); + CREATE INDEX idx_user_id_rate_limit ON rate_buckets (user_id, rate_limit_name); CREATE TABLE IF NOT EXISTS billing_preferences ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - user_id INTEGER NOT NULL REFERENCES users(id), + user_id INTEGER NOT NULL REFERENCES users (id), max_monthly_llm_usage_spending_in_cents INTEGER NOT NULL ); @@ -440,18 +489,19 @@ CREATE UNIQUE INDEX "uix_billing_preferences_on_user_id" ON billing_preferences CREATE TABLE IF NOT EXISTS billing_customers ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - user_id INTEGER NOT NULL REFERENCES users(id), + user_id INTEGER NOT NULL REFERENCES users (id), has_overdue_invoices BOOLEAN NOT NULL DEFAULT FALSE, stripe_customer_id TEXT NOT NULL ); CREATE UNIQUE INDEX "uix_billing_customers_on_user_id" ON billing_customers (user_id); + CREATE UNIQUE INDEX "uix_billing_customers_on_stripe_customer_id" ON billing_customers (stripe_customer_id); CREATE TABLE IF NOT EXISTS billing_subscriptions ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - billing_customer_id INTEGER NOT NULL REFERENCES billing_customers(id), + billing_customer_id INTEGER NOT NULL REFERENCES billing_customers (id), stripe_subscription_id TEXT NOT NULL, stripe_subscription_status TEXT NOT NULL, stripe_cancel_at TIMESTAMP, @@ -459,6 +509,7 @@ CREATE TABLE IF NOT EXISTS billing_subscriptions ( ); CREATE INDEX "ix_billing_subscriptions_on_billing_customer_id" ON billing_subscriptions (billing_customer_id); + CREATE UNIQUE INDEX "uix_billing_subscriptions_on_stripe_subscription_id" ON billing_subscriptions (stripe_subscription_id); CREATE TABLE IF NOT EXISTS processed_stripe_events ( @@ -479,4 +530,5 @@ CREATE TABLE IF NOT EXISTS "breakpoints" ( "path" TEXT NOT NULL, "kind" VARCHAR NOT NULL ); + CREATE INDEX "index_breakpoints_on_project_id" ON "breakpoints" ("project_id"); diff --git a/crates/collab/migrations/20250319182812_create_project_repositories.sql b/crates/collab/migrations/20250319182812_create_project_repositories.sql new file mode 100644 index 0000000000000000000000000000000000000000..8ca8c3444e60ccc4105e01e7a0d035930d57da4d --- /dev/null +++ b/crates/collab/migrations/20250319182812_create_project_repositories.sql @@ -0,0 +1,32 @@ +CREATE TABLE "project_repositories" ( + "project_id" INTEGER NOT NULL, + "abs_path" VARCHAR, + "id" INT8 NOT NULL, + "legacy_worktree_id" INT8, + "entry_ids" VARCHAR, + "branch" VARCHAR, + "scan_id" INT8 NOT NULL, + "is_deleted" BOOL NOT NULL, + "current_merge_conflicts" VARCHAR, + "branch_summary" VARCHAR, + PRIMARY KEY (project_id, id) +); + +CREATE INDEX "index_project_repositories_on_project_id" ON "project_repositories" ("project_id"); + +CREATE TABLE "project_repository_statuses" ( + "project_id" INTEGER NOT NULL, + "repository_id" INT8 NOT NULL, + "repo_path" VARCHAR NOT NULL, + "status" INT8 NOT NULL, + "status_kind" INT4 NOT NULL, + "first_status" INT4 NULL, + "second_status" INT4 NULL, + "scan_id" INT8 NOT NULL, + "is_deleted" BOOL NOT NULL, + PRIMARY KEY (project_id, repository_id, repo_path) +); + +CREATE INDEX "index_project_repos_statuses_on_project_id" ON "project_repository_statuses" ("project_id"); + +CREATE INDEX "index_project_repos_statuses_on_project_id_and_repo_id" ON "project_repository_statuses" ("project_id", "repository_id"); diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 908e488af6dd52686bcf659e4e8e61db6eb4a33a..46915a4d6214196f781cc65420b5aba5d82800b5 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -9,6 +9,7 @@ use anyhow::anyhow; use collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use dashmap::DashMap; use futures::StreamExt; +use project_repository_statuses::StatusKind; use rand::{prelude::StdRng, Rng, SeedableRng}; use rpc::ExtensionProvides; use rpc::{ @@ -36,7 +37,6 @@ use std::{ }; use time::PrimitiveDateTime; use tokio::sync::{Mutex, OwnedMutexGuard}; -use worktree_repository_statuses::StatusKind; use worktree_settings_file::LocalSettingsKind; #[cfg(test)] @@ -658,6 +658,8 @@ pub struct RejoinedProject { pub old_connection_id: ConnectionId, pub collaborators: Vec, pub worktrees: Vec, + pub updated_repositories: Vec, + pub removed_repositories: Vec, pub language_servers: Vec, } @@ -726,6 +728,7 @@ pub struct Project { pub role: ChannelRole, pub collaborators: Vec, pub worktrees: BTreeMap, + pub repositories: Vec, pub language_servers: Vec, } @@ -760,7 +763,7 @@ pub struct Worktree { pub root_name: String, pub visible: bool, pub entries: Vec, - pub repository_entries: BTreeMap, + pub legacy_repository_entries: BTreeMap, pub diagnostic_summaries: Vec, pub settings_files: Vec, pub scan_id: u64, @@ -810,7 +813,7 @@ impl LocalSettingsKind { } fn db_status_to_proto( - entry: worktree_repository_statuses::Model, + entry: project_repository_statuses::Model, ) -> anyhow::Result { use proto::git_file_status::{Tracked, Unmerged, Variant}; diff --git a/crates/collab/src/db/queries/projects.rs b/crates/collab/src/db/queries/projects.rs index 2970c9be0ffb6d6a50ae41b663af4f54b7f6e1b9..e675bc5e681e6d7323c172d0564d1dfe03a8a041 100644 --- a/crates/collab/src/db/queries/projects.rs +++ b/crates/collab/src/db/queries/projects.rs @@ -324,119 +324,135 @@ impl Database { .await?; } - if !update.updated_repositories.is_empty() { - worktree_repository::Entity::insert_many(update.updated_repositories.iter().map( - |repository| { - worktree_repository::ActiveModel { - project_id: ActiveValue::set(project_id), - worktree_id: ActiveValue::set(worktree_id), - work_directory_id: ActiveValue::set( - repository.work_directory_id as i64, - ), - scan_id: ActiveValue::set(update.scan_id as i64), - branch: ActiveValue::set(repository.branch.clone()), - is_deleted: ActiveValue::set(false), - branch_summary: ActiveValue::Set( - repository - .branch_summary - .as_ref() - .map(|summary| serde_json::to_string(summary).unwrap()), - ), - current_merge_conflicts: ActiveValue::Set(Some( - serde_json::to_string(&repository.current_merge_conflicts).unwrap(), - )), - } - }, - )) - .on_conflict( - OnConflict::columns([ - worktree_repository::Column::ProjectId, - worktree_repository::Column::WorktreeId, - worktree_repository::Column::WorkDirectoryId, - ]) - .update_columns([ - worktree_repository::Column::ScanId, - worktree_repository::Column::Branch, - worktree_repository::Column::BranchSummary, - worktree_repository::Column::CurrentMergeConflicts, - ]) - .to_owned(), - ) - .exec(&*tx) - .await?; - - let has_any_statuses = update - .updated_repositories - .iter() - .any(|repository| !repository.updated_statuses.is_empty()); - - if has_any_statuses { - worktree_repository_statuses::Entity::insert_many( - update.updated_repositories.iter().flat_map( - |repository: &proto::RepositoryEntry| { - repository.updated_statuses.iter().map(|status_entry| { - let (repo_path, status_kind, first_status, second_status) = - proto_status_to_db(status_entry.clone()); - worktree_repository_statuses::ActiveModel { - project_id: ActiveValue::set(project_id), - worktree_id: ActiveValue::set(worktree_id), - work_directory_id: ActiveValue::set( - repository.work_directory_id as i64, - ), - scan_id: ActiveValue::set(update.scan_id as i64), - is_deleted: ActiveValue::set(false), - repo_path: ActiveValue::set(repo_path), - status: ActiveValue::set(0), - status_kind: ActiveValue::set(status_kind), - first_status: ActiveValue::set(first_status), - second_status: ActiveValue::set(second_status), - } - }) - }, - ), + // Backward-compatibility for old Zed clients. + // + // Remove this block when Zed 1.80 stable has been out for a week. + { + if !update.updated_repositories.is_empty() { + project_repository::Entity::insert_many( + update.updated_repositories.iter().map(|repository| { + project_repository::ActiveModel { + project_id: ActiveValue::set(project_id), + legacy_worktree_id: ActiveValue::set(Some(worktree_id)), + id: ActiveValue::set(repository.work_directory_id as i64), + scan_id: ActiveValue::set(update.scan_id as i64), + is_deleted: ActiveValue::set(false), + branch_summary: ActiveValue::Set( + repository + .branch_summary + .as_ref() + .map(|summary| serde_json::to_string(summary).unwrap()), + ), + current_merge_conflicts: ActiveValue::Set(Some( + serde_json::to_string(&repository.current_merge_conflicts) + .unwrap(), + )), + + // Old clients do not use abs path or entry ids. + abs_path: ActiveValue::set(String::new()), + entry_ids: ActiveValue::set("[]".into()), + } + }), ) .on_conflict( OnConflict::columns([ - worktree_repository_statuses::Column::ProjectId, - worktree_repository_statuses::Column::WorktreeId, - worktree_repository_statuses::Column::WorkDirectoryId, - worktree_repository_statuses::Column::RepoPath, + project_repository::Column::ProjectId, + project_repository::Column::Id, ]) .update_columns([ - worktree_repository_statuses::Column::ScanId, - worktree_repository_statuses::Column::StatusKind, - worktree_repository_statuses::Column::FirstStatus, - worktree_repository_statuses::Column::SecondStatus, + project_repository::Column::ScanId, + project_repository::Column::BranchSummary, + project_repository::Column::CurrentMergeConflicts, ]) .to_owned(), ) .exec(&*tx) .await?; - } - let has_any_removed_statuses = update - .updated_repositories - .iter() - .any(|repository| !repository.removed_statuses.is_empty()); + let has_any_statuses = update + .updated_repositories + .iter() + .any(|repository| !repository.updated_statuses.is_empty()); + + if has_any_statuses { + project_repository_statuses::Entity::insert_many( + update.updated_repositories.iter().flat_map( + |repository: &proto::RepositoryEntry| { + repository.updated_statuses.iter().map(|status_entry| { + let (repo_path, status_kind, first_status, second_status) = + proto_status_to_db(status_entry.clone()); + project_repository_statuses::ActiveModel { + project_id: ActiveValue::set(project_id), + repository_id: ActiveValue::set( + repository.work_directory_id as i64, + ), + scan_id: ActiveValue::set(update.scan_id as i64), + is_deleted: ActiveValue::set(false), + repo_path: ActiveValue::set(repo_path), + status: ActiveValue::set(0), + status_kind: ActiveValue::set(status_kind), + first_status: ActiveValue::set(first_status), + second_status: ActiveValue::set(second_status), + } + }) + }, + ), + ) + .on_conflict( + OnConflict::columns([ + project_repository_statuses::Column::ProjectId, + project_repository_statuses::Column::RepositoryId, + project_repository_statuses::Column::RepoPath, + ]) + .update_columns([ + project_repository_statuses::Column::ScanId, + project_repository_statuses::Column::StatusKind, + project_repository_statuses::Column::FirstStatus, + project_repository_statuses::Column::SecondStatus, + ]) + .to_owned(), + ) + .exec(&*tx) + .await?; + } + + for repo in &update.updated_repositories { + if !repo.removed_statuses.is_empty() { + project_repository_statuses::Entity::update_many() + .filter( + project_repository_statuses::Column::ProjectId + .eq(project_id) + .and( + project_repository_statuses::Column::RepositoryId + .eq(repo.work_directory_id), + ) + .and( + project_repository_statuses::Column::RepoPath + .is_in(repo.removed_statuses.iter()), + ), + ) + .set(project_repository_statuses::ActiveModel { + is_deleted: ActiveValue::Set(true), + scan_id: ActiveValue::Set(update.scan_id as i64), + ..Default::default() + }) + .exec(&*tx) + .await?; + } + } + } - if has_any_removed_statuses { - worktree_repository_statuses::Entity::update_many() + if !update.removed_repositories.is_empty() { + project_repository::Entity::update_many() .filter( - worktree_repository_statuses::Column::ProjectId + project_repository::Column::ProjectId .eq(project_id) - .and( - worktree_repository_statuses::Column::WorktreeId - .eq(worktree_id), - ) - .and( - worktree_repository_statuses::Column::RepoPath.is_in( - update.updated_repositories.iter().flat_map(|repository| { - repository.removed_statuses.iter() - }), - ), - ), + .and(project_repository::Column::LegacyWorktreeId.eq(worktree_id)) + .and(project_repository::Column::Id.is_in( + update.removed_repositories.iter().map(|id| *id as i64), + )), ) - .set(worktree_repository_statuses::ActiveModel { + .set(project_repository::ActiveModel { is_deleted: ActiveValue::Set(true), scan_id: ActiveValue::Set(update.scan_id as i64), ..Default::default() @@ -446,18 +462,109 @@ impl Database { } } - if !update.removed_repositories.is_empty() { - worktree_repository::Entity::update_many() + let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?; + Ok(connection_ids) + }) + .await + } + + pub async fn update_repository( + &self, + update: &proto::UpdateRepository, + _connection: ConnectionId, + ) -> Result>> { + let project_id = ProjectId::from_proto(update.project_id); + let repository_id = update.id as i64; + self.project_transaction(project_id, |tx| async move { + project_repository::Entity::insert(project_repository::ActiveModel { + project_id: ActiveValue::set(project_id), + id: ActiveValue::set(repository_id), + legacy_worktree_id: ActiveValue::set(None), + abs_path: ActiveValue::set(update.abs_path.clone()), + entry_ids: ActiveValue::Set(serde_json::to_string(&update.entry_ids).unwrap()), + scan_id: ActiveValue::set(update.scan_id as i64), + is_deleted: ActiveValue::set(false), + branch_summary: ActiveValue::Set( + update + .branch_summary + .as_ref() + .map(|summary| serde_json::to_string(summary).unwrap()), + ), + current_merge_conflicts: ActiveValue::Set(Some( + serde_json::to_string(&update.current_merge_conflicts).unwrap(), + )), + }) + .on_conflict( + OnConflict::columns([ + project_repository::Column::ProjectId, + project_repository::Column::Id, + ]) + .update_columns([ + project_repository::Column::ScanId, + project_repository::Column::BranchSummary, + project_repository::Column::EntryIds, + project_repository::Column::AbsPath, + project_repository::Column::CurrentMergeConflicts, + ]) + .to_owned(), + ) + .exec(&*tx) + .await?; + + let has_any_statuses = !update.updated_statuses.is_empty(); + + if has_any_statuses { + project_repository_statuses::Entity::insert_many( + update.updated_statuses.iter().map(|status_entry| { + let (repo_path, status_kind, first_status, second_status) = + proto_status_to_db(status_entry.clone()); + project_repository_statuses::ActiveModel { + project_id: ActiveValue::set(project_id), + repository_id: ActiveValue::set(repository_id), + scan_id: ActiveValue::set(update.scan_id as i64), + is_deleted: ActiveValue::set(false), + repo_path: ActiveValue::set(repo_path), + status: ActiveValue::set(0), + status_kind: ActiveValue::set(status_kind), + first_status: ActiveValue::set(first_status), + second_status: ActiveValue::set(second_status), + } + }), + ) + .on_conflict( + OnConflict::columns([ + project_repository_statuses::Column::ProjectId, + project_repository_statuses::Column::RepositoryId, + project_repository_statuses::Column::RepoPath, + ]) + .update_columns([ + project_repository_statuses::Column::ScanId, + project_repository_statuses::Column::StatusKind, + project_repository_statuses::Column::FirstStatus, + project_repository_statuses::Column::SecondStatus, + ]) + .to_owned(), + ) + .exec(&*tx) + .await?; + } + + let has_any_removed_statuses = !update.removed_statuses.is_empty(); + + if has_any_removed_statuses { + project_repository_statuses::Entity::update_many() .filter( - worktree_repository::Column::ProjectId + project_repository_statuses::Column::ProjectId .eq(project_id) - .and(worktree_repository::Column::WorktreeId.eq(worktree_id)) .and( - worktree_repository::Column::WorkDirectoryId - .is_in(update.removed_repositories.iter().map(|id| *id as i64)), + project_repository_statuses::Column::RepositoryId.eq(repository_id), + ) + .and( + project_repository_statuses::Column::RepoPath + .is_in(update.removed_statuses.iter()), ), ) - .set(worktree_repository::ActiveModel { + .set(project_repository_statuses::ActiveModel { is_deleted: ActiveValue::Set(true), scan_id: ActiveValue::Set(update.scan_id as i64), ..Default::default() @@ -472,6 +579,34 @@ impl Database { .await } + pub async fn remove_repository( + &self, + remove: &proto::RemoveRepository, + _connection: ConnectionId, + ) -> Result>> { + let project_id = ProjectId::from_proto(remove.project_id); + let repository_id = remove.id as i64; + self.project_transaction(project_id, |tx| async move { + project_repository::Entity::update_many() + .filter( + project_repository::Column::ProjectId + .eq(project_id) + .and(project_repository::Column::Id.eq(repository_id)), + ) + .set(project_repository::ActiveModel { + is_deleted: ActiveValue::Set(true), + // scan_id: ActiveValue::Set(update.scan_id as i64), + ..Default::default() + }) + .exec(&*tx) + .await?; + + let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?; + Ok(connection_ids) + }) + .await + } + /// Updates the diagnostic summary for the given connection. pub async fn update_diagnostic_summary( &self, @@ -703,11 +838,11 @@ impl Database { root_name: db_worktree.root_name, visible: db_worktree.visible, entries: Default::default(), - repository_entries: Default::default(), diagnostic_summaries: Default::default(), settings_files: Default::default(), scan_id: db_worktree.scan_id as u64, completed_scan_id: db_worktree.completed_scan_id as u64, + legacy_repository_entries: Default::default(), }, ) }) @@ -750,65 +885,77 @@ impl Database { } // Populate repository entries. + let mut repositories = Vec::new(); { - let db_repository_entries = worktree_repository::Entity::find() + let db_repository_entries = project_repository::Entity::find() .filter( Condition::all() - .add(worktree_repository::Column::ProjectId.eq(project.id)) - .add(worktree_repository::Column::IsDeleted.eq(false)), + .add(project_repository::Column::ProjectId.eq(project.id)) + .add(project_repository::Column::IsDeleted.eq(false)), ) .all(tx) .await?; for db_repository_entry in db_repository_entries { - if let Some(worktree) = worktrees.get_mut(&(db_repository_entry.worktree_id as u64)) - { - let mut repository_statuses = worktree_repository_statuses::Entity::find() - .filter( - Condition::all() - .add(worktree_repository_statuses::Column::ProjectId.eq(project.id)) - .add( - worktree_repository_statuses::Column::WorktreeId - .eq(worktree.id), - ) - .add( - worktree_repository_statuses::Column::WorkDirectoryId - .eq(db_repository_entry.work_directory_id), - ) - .add(worktree_repository_statuses::Column::IsDeleted.eq(false)), - ) - .stream(tx) - .await?; - let mut updated_statuses = Vec::new(); - while let Some(status_entry) = repository_statuses.next().await { - let status_entry: worktree_repository_statuses::Model = status_entry?; - updated_statuses.push(db_status_to_proto(status_entry)?); - } - - let current_merge_conflicts = db_repository_entry - .current_merge_conflicts - .as_ref() - .map(|conflicts| serde_json::from_str(&conflicts)) - .transpose()? - .unwrap_or_default(); + let mut repository_statuses = project_repository_statuses::Entity::find() + .filter( + Condition::all() + .add(project_repository_statuses::Column::ProjectId.eq(project.id)) + .add( + project_repository_statuses::Column::RepositoryId + .eq(db_repository_entry.id), + ) + .add(project_repository_statuses::Column::IsDeleted.eq(false)), + ) + .stream(tx) + .await?; + let mut updated_statuses = Vec::new(); + while let Some(status_entry) = repository_statuses.next().await { + let status_entry = status_entry?; + updated_statuses.push(db_status_to_proto(status_entry)?); + } - let branch_summary = db_repository_entry - .branch_summary - .as_ref() - .map(|branch_summary| serde_json::from_str(&branch_summary)) - .transpose()? - .unwrap_or_default(); - - worktree.repository_entries.insert( - db_repository_entry.work_directory_id as u64, - proto::RepositoryEntry { - work_directory_id: db_repository_entry.work_directory_id as u64, - branch: db_repository_entry.branch, - updated_statuses, - removed_statuses: Vec::new(), - current_merge_conflicts, - branch_summary, - }, - ); + let current_merge_conflicts = db_repository_entry + .current_merge_conflicts + .as_ref() + .map(|conflicts| serde_json::from_str(&conflicts)) + .transpose()? + .unwrap_or_default(); + + let branch_summary = db_repository_entry + .branch_summary + .as_ref() + .map(|branch_summary| serde_json::from_str(&branch_summary)) + .transpose()? + .unwrap_or_default(); + + let entry_ids = serde_json::from_str(&db_repository_entry.entry_ids) + .context("failed to deserialize repository's entry ids")?; + + if let Some(worktree_id) = db_repository_entry.legacy_worktree_id { + if let Some(worktree) = worktrees.get_mut(&(worktree_id as u64)) { + worktree.legacy_repository_entries.insert( + db_repository_entry.id as u64, + proto::RepositoryEntry { + work_directory_id: db_repository_entry.id as u64, + updated_statuses, + removed_statuses: Vec::new(), + current_merge_conflicts, + branch_summary, + }, + ); + } + } else { + repositories.push(proto::UpdateRepository { + project_id: db_repository_entry.project_id.0 as u64, + id: db_repository_entry.id as u64, + abs_path: db_repository_entry.abs_path, + entry_ids, + updated_statuses, + removed_statuses: Vec::new(), + current_merge_conflicts, + branch_summary, + scan_id: db_repository_entry.scan_id as u64, + }); } } } @@ -871,6 +1018,7 @@ impl Database { }) .collect(), worktrees, + repositories, language_servers: language_servers .into_iter() .map(|language_server| proto::LanguageServer { diff --git a/crates/collab/src/db/queries/rooms.rs b/crates/collab/src/db/queries/rooms.rs index 3f65cc4258e6c1891c844c4263b9f5262fed7b09..a9032ac42ffa88f67046c96cde5de3b65ff4260b 100644 --- a/crates/collab/src/db/queries/rooms.rs +++ b/crates/collab/src/db/queries/rooms.rs @@ -1,3 +1,5 @@ +use anyhow::Context as _; + use super::*; impl Database { @@ -606,6 +608,11 @@ impl Database { let mut worktrees = Vec::new(); let db_worktrees = project.find_related(worktree::Entity).all(tx).await?; + let db_repos = project + .find_related(project_repository::Entity) + .all(tx) + .await?; + for db_worktree in db_worktrees { let mut worktree = RejoinedWorktree { id: db_worktree.id as u64, @@ -673,96 +680,112 @@ impl Database { } } - // Repository Entries - { - let repository_entry_filter = if let Some(rejoined_worktree) = rejoined_worktree { - worktree_repository::Column::ScanId.gt(rejoined_worktree.scan_id) - } else { - worktree_repository::Column::IsDeleted.eq(false) - }; + worktrees.push(worktree); + } - let db_repositories = worktree_repository::Entity::find() - .filter( - Condition::all() - .add(worktree_repository::Column::ProjectId.eq(project.id)) - .add(worktree_repository::Column::WorktreeId.eq(worktree.id)) - .add(repository_entry_filter), - ) - .all(tx) - .await?; + let mut removed_repositories = Vec::new(); + let mut updated_repositories = Vec::new(); + for db_repo in db_repos { + let rejoined_repository = rejoined_project + .repositories + .iter() + .find(|repo| repo.id == db_repo.id as u64); - for db_repository in db_repositories.into_iter() { - if db_repository.is_deleted { - worktree - .removed_repositories - .push(db_repository.work_directory_id as u64); + let repository_filter = if let Some(rejoined_repository) = rejoined_repository { + project_repository::Column::ScanId.gt(rejoined_repository.scan_id) + } else { + project_repository::Column::IsDeleted.eq(false) + }; + + let db_repositories = project_repository::Entity::find() + .filter( + Condition::all() + .add(project_repository::Column::ProjectId.eq(project.id)) + .add(repository_filter), + ) + .all(tx) + .await?; + + for db_repository in db_repositories.into_iter() { + if db_repository.is_deleted { + removed_repositories.push(db_repository.id as u64); + } else { + let status_entry_filter = if let Some(rejoined_repository) = rejoined_repository + { + project_repository_statuses::Column::ScanId.gt(rejoined_repository.scan_id) } else { - let status_entry_filter = if let Some(rejoined_worktree) = rejoined_worktree - { - worktree_repository_statuses::Column::ScanId - .gt(rejoined_worktree.scan_id) + project_repository_statuses::Column::IsDeleted.eq(false) + }; + + let mut db_statuses = project_repository_statuses::Entity::find() + .filter( + Condition::all() + .add(project_repository_statuses::Column::ProjectId.eq(project.id)) + .add( + project_repository_statuses::Column::RepositoryId + .eq(db_repository.id), + ) + .add(status_entry_filter), + ) + .stream(tx) + .await?; + let mut removed_statuses = Vec::new(); + let mut updated_statuses = Vec::new(); + + while let Some(db_status) = db_statuses.next().await { + let db_status: project_repository_statuses::Model = db_status?; + if db_status.is_deleted { + removed_statuses.push(db_status.repo_path); } else { - worktree_repository_statuses::Column::IsDeleted.eq(false) - }; - - let mut db_statuses = worktree_repository_statuses::Entity::find() - .filter( - Condition::all() - .add( - worktree_repository_statuses::Column::ProjectId - .eq(project.id), - ) - .add( - worktree_repository_statuses::Column::WorktreeId - .eq(worktree.id), - ) - .add( - worktree_repository_statuses::Column::WorkDirectoryId - .eq(db_repository.work_directory_id), - ) - .add(status_entry_filter), - ) - .stream(tx) - .await?; - let mut removed_statuses = Vec::new(); - let mut updated_statuses = Vec::new(); - - while let Some(db_status) = db_statuses.next().await { - let db_status: worktree_repository_statuses::Model = db_status?; - if db_status.is_deleted { - removed_statuses.push(db_status.repo_path); - } else { - updated_statuses.push(db_status_to_proto(db_status)?); - } + updated_statuses.push(db_status_to_proto(db_status)?); } + } - let current_merge_conflicts = db_repository - .current_merge_conflicts - .as_ref() - .map(|conflicts| serde_json::from_str(&conflicts)) - .transpose()? - .unwrap_or_default(); - - let branch_summary = db_repository - .branch_summary - .as_ref() - .map(|branch_summary| serde_json::from_str(&branch_summary)) - .transpose()? - .unwrap_or_default(); - - worktree.updated_repositories.push(proto::RepositoryEntry { - work_directory_id: db_repository.work_directory_id as u64, - branch: db_repository.branch, + let current_merge_conflicts = db_repository + .current_merge_conflicts + .as_ref() + .map(|conflicts| serde_json::from_str(&conflicts)) + .transpose()? + .unwrap_or_default(); + + let branch_summary = db_repository + .branch_summary + .as_ref() + .map(|branch_summary| serde_json::from_str(&branch_summary)) + .transpose()? + .unwrap_or_default(); + + let entry_ids = serde_json::from_str(&db_repository.entry_ids) + .context("failed to deserialize repository's entry ids")?; + + if let Some(legacy_worktree_id) = db_repository.legacy_worktree_id { + if let Some(worktree) = worktrees + .iter_mut() + .find(|worktree| worktree.id as i64 == legacy_worktree_id) + { + worktree.updated_repositories.push(proto::RepositoryEntry { + work_directory_id: db_repository.id as u64, + updated_statuses, + removed_statuses, + current_merge_conflicts, + branch_summary, + }); + } + } else { + updated_repositories.push(proto::UpdateRepository { + entry_ids, updated_statuses, removed_statuses, current_merge_conflicts, branch_summary, + project_id: project_id.to_proto(), + id: db_repository.id as u64, + abs_path: db_repository.abs_path, + scan_id: db_repository.scan_id as u64, }); } } } - - worktrees.push(worktree); } let language_servers = project @@ -832,6 +855,8 @@ impl Database { id: project_id, old_connection_id, collaborators, + updated_repositories, + removed_repositories, worktrees, language_servers, })) diff --git a/crates/collab/src/db/tables.rs b/crates/collab/src/db/tables.rs index 8a4ec29998ac8693186d22c0745c8277caa62502..f3dfa6c3ab35253794138158381eede46094546c 100644 --- a/crates/collab/src/db/tables.rs +++ b/crates/collab/src/db/tables.rs @@ -26,6 +26,8 @@ pub mod observed_channel_messages; pub mod processed_stripe_event; pub mod project; pub mod project_collaborator; +pub mod project_repository; +pub mod project_repository_statuses; pub mod rate_buckets; pub mod room; pub mod room_participant; @@ -36,6 +38,4 @@ pub mod user_feature; pub mod worktree; pub mod worktree_diagnostic_summary; pub mod worktree_entry; -pub mod worktree_repository; -pub mod worktree_repository_statuses; pub mod worktree_settings_file; diff --git a/crates/collab/src/db/tables/project.rs b/crates/collab/src/db/tables/project.rs index 10e3da50e1dd09932913bf45c0792decf871de50..0d4d1aa419d1dba076e1338c496606c684e0e008 100644 --- a/crates/collab/src/db/tables/project.rs +++ b/crates/collab/src/db/tables/project.rs @@ -45,6 +45,8 @@ pub enum Relation { Room, #[sea_orm(has_many = "super::worktree::Entity")] Worktrees, + #[sea_orm(has_many = "super::project_repository::Entity")] + Repositories, #[sea_orm(has_many = "super::project_collaborator::Entity")] Collaborators, #[sea_orm(has_many = "super::language_server::Entity")] @@ -69,6 +71,12 @@ impl Related for Entity { } } +impl Related for Entity { + fn to() -> RelationDef { + Relation::Repositories.def() + } +} + impl Related for Entity { fn to() -> RelationDef { Relation::Collaborators.def() diff --git a/crates/collab/src/db/tables/worktree_repository.rs b/crates/collab/src/db/tables/project_repository.rs similarity index 51% rename from crates/collab/src/db/tables/worktree_repository.rs rename to crates/collab/src/db/tables/project_repository.rs index 66247f9f17c45e215f833eeed73aed31c6a9b620..36fb4a54c6e861d91f87523051083e91f519d490 100644 --- a/crates/collab/src/db/tables/worktree_repository.rs +++ b/crates/collab/src/db/tables/project_repository.rs @@ -2,16 +2,17 @@ use crate::db::ProjectId; use sea_orm::entity::prelude::*; #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] -#[sea_orm(table_name = "worktree_repositories")] +#[sea_orm(table_name = "project_repositories")] pub struct Model { #[sea_orm(primary_key)] pub project_id: ProjectId, #[sea_orm(primary_key)] - pub worktree_id: i64, - #[sea_orm(primary_key)] - pub work_directory_id: i64, + pub id: i64, + pub abs_path: String, + pub legacy_worktree_id: Option, + // JSON array containing 1 or more integer project entry ids + pub entry_ids: String, pub scan_id: i64, - pub branch: Option, pub is_deleted: bool, // JSON array typed string pub current_merge_conflicts: Option, @@ -20,6 +21,19 @@ pub struct Model { } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] -pub enum Relation {} +pub enum Relation { + #[sea_orm( + belongs_to = "super::project::Entity", + from = "Column::ProjectId", + to = "super::project::Column::Id" + )] + Project, +} + +impl Related for Entity { + fn to() -> RelationDef { + Relation::Project.def() + } +} impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/collab/src/db/tables/worktree_repository_statuses.rs b/crates/collab/src/db/tables/project_repository_statuses.rs similarity index 88% rename from crates/collab/src/db/tables/worktree_repository_statuses.rs rename to crates/collab/src/db/tables/project_repository_statuses.rs index 3e4a4f550e6be1ca1d43ad3b1d8336e02024bac2..7bb903d45085467a3285a58f8afdd7a29339731a 100644 --- a/crates/collab/src/db/tables/worktree_repository_statuses.rs +++ b/crates/collab/src/db/tables/project_repository_statuses.rs @@ -2,14 +2,12 @@ use crate::db::ProjectId; use sea_orm::entity::prelude::*; #[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel)] -#[sea_orm(table_name = "worktree_repository_statuses")] +#[sea_orm(table_name = "project_repository_statuses")] pub struct Model { #[sea_orm(primary_key)] pub project_id: ProjectId, #[sea_orm(primary_key)] - pub worktree_id: i64, - #[sea_orm(primary_key)] - pub work_directory_id: i64, + pub repository_id: i64, #[sea_orm(primary_key)] pub repo_path: String, /// Old single-code status field, no longer used but kept here to mirror the DB schema. diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index a22f5e5646a64161d3de45eacec7c76f28a597dd..96e3694eafa525385a63119e6d2e0646cd20b9dc 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -37,6 +37,7 @@ use core::fmt::{self, Debug, Formatter}; use http_client::HttpClient; use open_ai::{OpenAiEmbeddingModel, OPEN_AI_API_URL}; use reqwest_client::ReqwestClient; +use rpc::proto::split_repository_update; use sha2::Digest; use supermaven_api::{CreateExternalUserRequest, SupermavenAdminApi}; @@ -291,6 +292,8 @@ impl Server { .add_message_handler(leave_project) .add_request_handler(update_project) .add_request_handler(update_worktree) + .add_request_handler(update_repository) + .add_request_handler(remove_repository) .add_message_handler(start_language_server) .add_message_handler(update_language_server) .add_message_handler(update_diagnostic_summary) @@ -1464,7 +1467,7 @@ fn notify_rejoined_projects( removed_repositories: worktree.removed_repositories, }; for update in proto::split_worktree_update(message) { - session.peer.send(session.connection_id, update.clone())?; + session.peer.send(session.connection_id, update)?; } // Stream this worktree's diagnostics. @@ -1493,21 +1496,23 @@ fn notify_rejoined_projects( } } - for language_server in &project.language_servers { + for repository in mem::take(&mut project.updated_repositories) { + for update in split_repository_update(repository) { + session.peer.send(session.connection_id, update)?; + } + } + + for id in mem::take(&mut project.removed_repositories) { session.peer.send( session.connection_id, - proto::UpdateLanguageServer { + proto::RemoveRepository { project_id: project.id.to_proto(), - language_server_id: language_server.id, - variant: Some( - proto::update_language_server::Variant::DiskBasedDiagnosticsUpdated( - proto::LspDiskBasedDiagnosticsUpdated {}, - ), - ), + id, }, )?; } } + Ok(()) } @@ -1893,7 +1898,7 @@ fn join_project_internal( removed_entries: Default::default(), scan_id: worktree.scan_id, is_last_update: worktree.scan_id == worktree.completed_scan_id, - updated_repositories: worktree.repository_entries.into_values().collect(), + updated_repositories: worktree.legacy_repository_entries.into_values().collect(), removed_repositories: Default::default(), }; for update in proto::split_worktree_update(message) { @@ -1926,6 +1931,12 @@ fn join_project_internal( } } + for repository in mem::take(&mut project.repositories) { + for update in split_repository_update(repository) { + session.peer.send(session.connection_id, update)?; + } + } + for language_server in &project.language_servers { session.peer.send( session.connection_id, @@ -2018,6 +2029,54 @@ async fn update_worktree( Ok(()) } +async fn update_repository( + request: proto::UpdateRepository, + response: Response, + session: Session, +) -> Result<()> { + let guest_connection_ids = session + .db() + .await + .update_repository(&request, session.connection_id) + .await?; + + broadcast( + Some(session.connection_id), + guest_connection_ids.iter().copied(), + |connection_id| { + session + .peer + .forward_send(session.connection_id, connection_id, request.clone()) + }, + ); + response.send(proto::Ack {})?; + Ok(()) +} + +async fn remove_repository( + request: proto::RemoveRepository, + response: Response, + session: Session, +) -> Result<()> { + let guest_connection_ids = session + .db() + .await + .remove_repository(&request, session.connection_id) + .await?; + + broadcast( + Some(session.connection_id), + guest_connection_ids.iter().copied(), + |connection_id| { + session + .peer + .forward_send(session.connection_id, connection_id, request.clone()) + }, + ); + response.send(proto::Ack {})?; + Ok(()) +} + /// Updates other participants with changes to the diagnostics async fn update_diagnostic_summary( message: proto::UpdateDiagnosticSummary, diff --git a/crates/collab/src/tests/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs index 81134502ead67d3fd3e69c38da2acea435a842b6..5f0efd7ecf72192958a9a1eb72bad6f4229c5cf5 100644 --- a/crates/collab/src/tests/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -2847,7 +2847,7 @@ async fn test_git_diff_base_change( }); } -#[gpui::test] +#[gpui::test(iterations = 10)] async fn test_git_branch_name( executor: BackgroundExecutor, cx_a: &mut TestAppContext, diff --git a/crates/project/src/connection_manager.rs b/crates/project/src/connection_manager.rs index c3611089bc74f384cfe34de9230005681bdbcff4..72806cd9772539f70292644c52be470df4526825 100644 --- a/crates/project/src/connection_manager.rs +++ b/crates/project/src/connection_manager.rs @@ -86,18 +86,25 @@ impl Manager { let project = handle.read(cx); let project_id = project.remote_id()?; projects.insert(project_id, handle.clone()); + let mut worktrees = Vec::new(); + let mut repositories = Vec::new(); + for worktree in project.worktrees(cx) { + let worktree = worktree.read(cx); + worktrees.push(proto::RejoinWorktree { + id: worktree.id().to_proto(), + scan_id: worktree.completed_scan_id() as u64, + }); + for repository in worktree.repositories().iter() { + repositories.push(proto::RejoinRepository { + id: repository.work_directory_id().to_proto(), + scan_id: worktree.completed_scan_id() as u64, + }); + } + } Some(proto::RejoinProject { id: project_id, - worktrees: project - .worktrees(cx) - .map(|worktree| { - let worktree = worktree.read(cx); - proto::RejoinWorktree { - id: worktree.id().to_proto(), - scan_id: worktree.completed_scan_id() as u64, - } - }) - .collect(), + worktrees, + repositories, }) } else { None diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 9bc994cfddc4d87f9c3a839185d55b9438f6072e..a1d0fb60022e944804cbe3e6556c6c42e38005c4 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -780,6 +780,8 @@ impl Project { client.add_entity_message_handler(Self::handle_unshare_project); client.add_entity_request_handler(Self::handle_update_buffer); client.add_entity_message_handler(Self::handle_update_worktree); + client.add_entity_message_handler(Self::handle_update_repository); + client.add_entity_message_handler(Self::handle_remove_repository); client.add_entity_request_handler(Self::handle_synchronize_buffers); client.add_entity_request_handler(Self::handle_search_candidate_buffers); @@ -1121,6 +1123,8 @@ impl Project { ssh_proto.add_entity_message_handler(Self::handle_create_buffer_for_peer); ssh_proto.add_entity_message_handler(Self::handle_update_worktree); + ssh_proto.add_entity_message_handler(Self::handle_update_repository); + ssh_proto.add_entity_message_handler(Self::handle_remove_repository); ssh_proto.add_entity_message_handler(Self::handle_update_project); ssh_proto.add_entity_message_handler(Self::handle_toast); ssh_proto.add_entity_request_handler(Self::handle_language_server_prompt_request); @@ -4029,28 +4033,13 @@ impl Project { } pub fn project_path_for_absolute_path(&self, abs_path: &Path, cx: &App) -> Option { - self.find_local_worktree(abs_path, cx) + self.find_worktree(abs_path, cx) .map(|(worktree, relative_path)| ProjectPath { worktree_id: worktree.read(cx).id(), path: relative_path.into(), }) } - pub fn find_local_worktree( - &self, - abs_path: &Path, - cx: &App, - ) -> Option<(Entity, PathBuf)> { - let trees = self.worktrees(cx); - - for tree in trees { - if let Some(relative_path) = abs_path.strip_prefix(tree.read(cx).abs_path()).ok() { - return Some((tree.clone(), relative_path.into())); - } - } - None - } - pub fn get_workspace_root(&self, project_path: &ProjectPath, cx: &App) -> Option { Some( self.worktree_for_id(project_path.worktree_id, cx)? @@ -4299,7 +4288,43 @@ impl Project { if let Some(worktree) = this.worktree_for_id(worktree_id, cx) { worktree.update(cx, |worktree, _| { let worktree = worktree.as_remote_mut().unwrap(); - worktree.update_from_remote(envelope.payload); + worktree.update_from_remote(envelope.payload.into()); + }); + } + Ok(()) + })? + } + + async fn handle_update_repository( + this: Entity, + envelope: TypedEnvelope, + mut cx: AsyncApp, + ) -> Result<()> { + this.update(&mut cx, |this, cx| { + if let Some((worktree, _relative_path)) = + this.find_worktree(envelope.payload.abs_path.as_ref(), cx) + { + worktree.update(cx, |worktree, _| { + let worktree = worktree.as_remote_mut().unwrap(); + worktree.update_from_remote(envelope.payload.into()); + }); + } + Ok(()) + })? + } + + async fn handle_remove_repository( + this: Entity, + envelope: TypedEnvelope, + mut cx: AsyncApp, + ) -> Result<()> { + this.update(&mut cx, |this, cx| { + if let Some(worktree) = + this.worktree_for_entry(ProjectEntryId::from_proto(envelope.payload.id), cx) + { + worktree.update(cx, |worktree, _| { + let worktree = worktree.as_remote_mut().unwrap(); + worktree.update_from_remote(envelope.payload.into()); }); } Ok(()) diff --git a/crates/project/src/worktree_store.rs b/crates/project/src/worktree_store.rs index ee8539c73813f04360854a7b28254219abae8489..3c2cfe547c3009bc7eb2f3258615b3b4f67d95e9 100644 --- a/crates/project/src/worktree_store.rs +++ b/crates/project/src/worktree_store.rs @@ -570,12 +570,44 @@ impl WorktreeStore { let client = client.clone(); async move { if client.is_via_collab() { - client - .request(update) - .map(|result| result.log_err().is_some()) - .await + match update { + proto::WorktreeRelatedMessage::UpdateWorktree( + update, + ) => { + client + .request(update) + .map(|result| result.log_err().is_some()) + .await + } + proto::WorktreeRelatedMessage::UpdateRepository( + update, + ) => { + client + .request(update) + .map(|result| result.log_err().is_some()) + .await + } + proto::WorktreeRelatedMessage::RemoveRepository( + update, + ) => { + client + .request(update) + .map(|result| result.log_err().is_some()) + .await + } + } } else { - client.send(update).log_err().is_some() + match update { + proto::WorktreeRelatedMessage::UpdateWorktree( + update, + ) => client.send(update).log_err().is_some(), + proto::WorktreeRelatedMessage::UpdateRepository( + update, + ) => client.send(update).log_err().is_some(), + proto::WorktreeRelatedMessage::RemoveRepository( + update, + ) => client.send(update).log_err().is_some(), + } } } } diff --git a/crates/proto/proto/zed.proto b/crates/proto/proto/zed.proto index 26f0403c81f16494a5525e61502dc5faeec09351..6573a57c8624538874c50ce70ab97c773fb6b8ad 100644 --- a/crates/proto/proto/zed.proto +++ b/crates/proto/proto/zed.proto @@ -355,9 +355,10 @@ message Envelope { RefreshCodeLens refresh_code_lens = 325; ToggleBreakpoint toggle_breakpoint = 326; - BreakpointsForFile breakpoints_for_file = 327; // current max - + BreakpointsForFile breakpoints_for_file = 327; + UpdateRepository update_repository = 328; + RemoveRepository remove_repository = 329; // current max } reserved 87 to 88; @@ -455,6 +456,7 @@ message RejoinRemoteProjectsResponse { message RejoinProject { uint64 id = 1; repeated RejoinWorktree worktrees = 2; + repeated RejoinRepository repositories = 3; } message RejoinWorktree { @@ -462,6 +464,11 @@ message RejoinWorktree { uint64 scan_id = 2; } +message RejoinRepository { + uint64 id = 1; + uint64 scan_id = 2; +} + message RejoinRoomResponse { Room room = 1; repeated ResharedProject reshared_projects = 2; @@ -637,8 +644,8 @@ message UpdateWorktree { string root_name = 3; repeated Entry updated_entries = 4; repeated uint64 removed_entries = 5; - repeated RepositoryEntry updated_repositories = 6; - repeated uint64 removed_repositories = 7; + repeated RepositoryEntry updated_repositories = 6; // deprecated + repeated uint64 removed_repositories = 7; // deprecated uint64 scan_id = 8; bool is_last_update = 9; string abs_path = 10; @@ -1900,13 +1907,29 @@ message Entry { message RepositoryEntry { uint64 work_directory_id = 1; - optional string branch = 2; // deprecated - optional Branch branch_summary = 6; + reserved 2; repeated StatusEntry updated_statuses = 3; repeated string removed_statuses = 4; repeated string current_merge_conflicts = 5; + optional Branch branch_summary = 6; } +message UpdateRepository { + uint64 project_id = 1; + uint64 id = 2; + string abs_path = 3; + repeated uint64 entry_ids = 4; + optional Branch branch_summary = 5; + repeated StatusEntry updated_statuses = 6; + repeated string removed_statuses = 7; + repeated string current_merge_conflicts = 8; + uint64 scan_id = 9; +} + +message RemoveRepository { + uint64 project_id = 1; + uint64 id = 2; +} message StatusEntry { string repo_path = 1; diff --git a/crates/proto/src/proto.rs b/crates/proto/src/proto.rs index a612dcaac5740c54b1bdc1a416cf18d8c626d6e7..72e476cd95a08003ebd8baa37b6f65c5cab5d977 100644 --- a/crates/proto/src/proto.rs +++ b/crates/proto/src/proto.rs @@ -445,6 +445,8 @@ messages!( (UpdateUserPlan, Foreground), (UpdateWorktree, Foreground), (UpdateWorktreeSettings, Foreground), + (UpdateRepository, Foreground), + (RemoveRepository, Foreground), (UsersResponse, Foreground), (GitReset, Background), (GitCheckoutFiles, Background), @@ -573,6 +575,8 @@ request_messages!( (UpdateParticipantLocation, Ack), (UpdateProject, Ack), (UpdateWorktree, Ack), + (UpdateRepository, Ack), + (RemoveRepository, Ack), (LspExtExpandMacro, LspExtExpandMacroResponse), (LspExtOpenDocs, LspExtOpenDocsResponse), (SetRoomParticipantRole, Ack), @@ -689,6 +693,8 @@ entity_messages!( UpdateProject, UpdateProjectCollaborator, UpdateWorktree, + UpdateRepository, + RemoveRepository, UpdateWorktreeSettings, LspExtExpandMacro, LspExtOpenDocs, @@ -783,6 +789,31 @@ pub const MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE: usize = 2; #[cfg(not(any(test, feature = "test-support")))] pub const MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE: usize = 256; +#[derive(Clone, Debug)] +pub enum WorktreeRelatedMessage { + UpdateWorktree(UpdateWorktree), + UpdateRepository(UpdateRepository), + RemoveRepository(RemoveRepository), +} + +impl From for WorktreeRelatedMessage { + fn from(value: UpdateWorktree) -> Self { + Self::UpdateWorktree(value) + } +} + +impl From for WorktreeRelatedMessage { + fn from(value: UpdateRepository) -> Self { + Self::UpdateRepository(value) + } +} + +impl From for WorktreeRelatedMessage { + fn from(value: RemoveRepository) -> Self { + Self::RemoveRepository(value) + } +} + pub fn split_worktree_update(mut message: UpdateWorktree) -> impl Iterator { let mut done = false; @@ -817,7 +848,6 @@ pub fn split_worktree_update(mut message: UpdateWorktree) -> impl Iterator impl Iterator impl Iterator { + let mut updated_statuses_iter = mem::take(&mut update.updated_statuses).into_iter().fuse(); + let mut removed_statuses_iter = mem::take(&mut update.removed_statuses).into_iter().fuse(); + let mut is_first = true; + std::iter::from_fn(move || { + let updated_statuses = updated_statuses_iter + .by_ref() + .take(MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE) + .collect::>(); + let removed_statuses = removed_statuses_iter + .by_ref() + .take(MAX_WORKTREE_UPDATE_MAX_CHUNK_SIZE) + .collect::>(); + if updated_statuses.is_empty() && removed_statuses.is_empty() && !is_first { + return None; + } + is_first = false; + Some(UpdateRepository { + updated_statuses, + removed_statuses, + ..update.clone() + }) + }) +} + +pub fn split_worktree_related_message( + message: WorktreeRelatedMessage, +) -> Box + Send> { + match message { + WorktreeRelatedMessage::UpdateWorktree(message) => { + Box::new(split_worktree_update(message).map(WorktreeRelatedMessage::UpdateWorktree)) + } + WorktreeRelatedMessage::UpdateRepository(message) => { + Box::new(split_repository_update(message).map(WorktreeRelatedMessage::UpdateRepository)) + } + WorktreeRelatedMessage::RemoveRepository(update) => Box::new([update.into()].into_iter()), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/remote_server/src/remote_editing_tests.rs b/crates/remote_server/src/remote_editing_tests.rs index 35e87fbe93014f6749a1448074c154cfcc44394b..bc95326e7c5594ce8733202636a7ec1d1c30dfa0 100644 --- a/crates/remote_server/src/remote_editing_tests.rs +++ b/crates/remote_server/src/remote_editing_tests.rs @@ -1318,7 +1318,7 @@ async fn test_remote_git_diffs(cx: &mut TestAppContext, server_cx: &mut TestAppC async fn test_remote_git_branches(cx: &mut TestAppContext, server_cx: &mut TestAppContext) { let fs = FakeFs::new(server_cx.executor()); fs.insert_tree( - "/code", + path!("/code"), json!({ "project1": { ".git": {}, @@ -1334,11 +1334,11 @@ async fn test_remote_git_branches(cx: &mut TestAppContext, server_cx: &mut TestA .iter() .map(ToString::to_string) .collect::>(); - fs.insert_branches(Path::new("/code/project1/.git"), &branches); + fs.insert_branches(Path::new(path!("/code/project1/.git")), &branches); let (worktree, _) = project .update(cx, |project, cx| { - project.find_or_create_worktree("/code/project1", true, cx) + project.find_or_create_worktree(path!("/code/project1"), true, cx) }) .await .unwrap(); diff --git a/crates/worktree/src/worktree.rs b/crates/worktree/src/worktree.rs index a688d45ddb875d300588687bc3a04551f55331af..85e9dd1a9c54e8c431288b6012098ad7fe07eef3 100644 --- a/crates/worktree/src/worktree.rs +++ b/crates/worktree/src/worktree.rs @@ -41,7 +41,7 @@ use postage::{ watch, }; use rpc::{ - proto::{self, split_worktree_update, FromProto, ToProto}, + proto::{self, split_worktree_related_message, FromProto, ToProto, WorktreeRelatedMessage}, AnyProtoClient, }; pub use settings::WorktreeId; @@ -140,12 +140,12 @@ struct ScanRequest { pub struct RemoteWorktree { snapshot: Snapshot, - background_snapshot: Arc)>>, + background_snapshot: Arc)>>, project_id: u64, client: AnyProtoClient, file_scan_inclusions: PathMatcher, - updates_tx: Option>, - update_observer: Option>, + updates_tx: Option>, + update_observer: Option>, snapshot_subscriptions: VecDeque<(usize, oneshot::Sender<()>)>, replica_id: ReplicaId, visible: bool, @@ -200,6 +200,7 @@ pub struct RepositoryEntry { pub(crate) statuses_by_path: SumTree, work_directory_id: ProjectEntryId, pub work_directory: WorkDirectory, + work_directory_abs_path: PathBuf, pub(crate) current_branch: Option, pub current_merge_conflicts: TreeSet, } @@ -247,13 +248,12 @@ impl RepositoryEntry { .cloned() } - pub fn initial_update(&self) -> proto::RepositoryEntry { - proto::RepositoryEntry { - work_directory_id: self.work_directory_id.to_proto(), - branch: self - .current_branch - .as_ref() - .map(|branch| branch.name.to_string()), + pub fn initial_update( + &self, + project_id: u64, + worktree_scan_id: usize, + ) -> proto::UpdateRepository { + proto::UpdateRepository { branch_summary: self.current_branch.as_ref().map(branch_to_proto), updated_statuses: self .statuses_by_path @@ -266,10 +266,26 @@ impl RepositoryEntry { .iter() .map(|repo_path| repo_path.to_proto()) .collect(), + project_id, + // This is semantically wrong---we want to move to having separate IDs for repositories. + // But for the moment, RepositoryEntry isn't set up to provide that at this level, so we + // shim it using the work directory's project entry ID. The pair of this + project ID will + // be globally unique. + id: self.work_directory_id().to_proto(), + abs_path: self.work_directory_abs_path.as_path().to_proto(), + entry_ids: vec![self.work_directory_id().to_proto()], + // This is also semantically wrong, and should be replaced once we separate git repo updates + // from worktree scans. + scan_id: worktree_scan_id as u64, } } - pub fn build_update(&self, old: &Self) -> proto::RepositoryEntry { + pub fn build_update( + &self, + old: &Self, + project_id: u64, + scan_id: usize, + ) -> proto::UpdateRepository { let mut updated_statuses: Vec = Vec::new(); let mut removed_statuses: Vec = Vec::new(); @@ -311,12 +327,7 @@ impl RepositoryEntry { } } - proto::RepositoryEntry { - work_directory_id: self.work_directory_id.to_proto(), - branch: self - .current_branch - .as_ref() - .map(|branch| branch.name.to_string()), + proto::UpdateRepository { branch_summary: self.current_branch.as_ref().map(branch_to_proto), updated_statuses, removed_statuses, @@ -325,6 +336,11 @@ impl RepositoryEntry { .iter() .map(|path| path.as_ref().to_proto()) .collect(), + project_id, + id: self.work_directory_id.to_proto(), + abs_path: self.work_directory_abs_path.as_path().to_proto(), + entry_ids: vec![self.work_directory_id.to_proto()], + scan_id: scan_id as u64, } } } @@ -808,8 +824,12 @@ impl Worktree { Arc::::from_proto(worktree.abs_path), ); - let background_snapshot = Arc::new(Mutex::new((snapshot.clone(), Vec::new()))); - let (background_updates_tx, mut background_updates_rx) = mpsc::unbounded(); + let background_snapshot = Arc::new(Mutex::new(( + snapshot.clone(), + Vec::::new(), + ))); + let (background_updates_tx, mut background_updates_rx) = + mpsc::unbounded::(); let (mut snapshot_updated_tx, mut snapshot_updated_rx) = watch::channel(); let worktree_id = snapshot.id(); @@ -839,12 +859,9 @@ impl Worktree { while let Some(update) = background_updates_rx.next().await { { let mut lock = background_snapshot.lock(); - if let Err(error) = lock - .0 + lock.0 .apply_remote_update(update.clone(), &settings.file_scan_inclusions) - { - log::error!("error applying worktree update: {}", error); - } + .log_err(); lock.1.push(update); } snapshot_updated_tx.send(()).await.ok(); @@ -864,16 +881,18 @@ impl Worktree { let mut lock = this.background_snapshot.lock(); this.snapshot = lock.0.clone(); for update in lock.1.drain(..) { - if !update.updated_entries.is_empty() - || !update.removed_entries.is_empty() - { - entries_changed = true; - } - if !update.updated_repositories.is_empty() - || !update.removed_repositories.is_empty() - { - git_repos_changed = true; - } + entries_changed |= match &update { + WorktreeRelatedMessage::UpdateWorktree(update_worktree) => { + !update_worktree.updated_entries.is_empty() + || !update_worktree.removed_entries.is_empty() + } + _ => false, + }; + git_repos_changed |= matches!( + update, + WorktreeRelatedMessage::UpdateRepository(_) + | WorktreeRelatedMessage::RemoveRepository(_) + ); if let Some(tx) = &this.update_observer { tx.unbounded_send(update).ok(); } @@ -1010,7 +1029,7 @@ impl Worktree { pub fn observe_updates(&mut self, project_id: u64, cx: &Context, callback: F) where - F: 'static + Send + Fn(proto::UpdateWorktree) -> Fut, + F: 'static + Send + Fn(WorktreeRelatedMessage) -> Fut, Fut: 'static + Send + Future, { match self { @@ -2289,8 +2308,8 @@ impl LocalWorktree { fn observe_updates(&mut self, project_id: u64, cx: &Context, callback: F) where - F: 'static + Send + Fn(proto::UpdateWorktree) -> Fut, - Fut: Send + Future, + F: 'static + Send + Fn(WorktreeRelatedMessage) -> Fut, + Fut: 'static + Send + Future, { if let Some(observer) = self.update_observer.as_mut() { *observer.resume_updates.borrow_mut() = (); @@ -2308,14 +2327,17 @@ impl LocalWorktree { let _maintain_remote_snapshot = cx.background_spawn(async move { let mut is_first = true; while let Some((snapshot, entry_changes, repo_changes)) = snapshots_rx.next().await { - let update = if is_first { + let updates = if is_first { is_first = false; snapshot.build_initial_update(project_id, worktree_id) } else { snapshot.build_update(project_id, worktree_id, entry_changes, repo_changes) }; - for update in proto::split_worktree_update(update) { + for update in updates + .into_iter() + .flat_map(proto::split_worktree_related_message) + { let _ = resume_updates_rx.try_recv(); loop { let result = callback(update.clone()); @@ -2378,7 +2400,7 @@ impl RemoteWorktree { self.disconnected = true; } - pub fn update_from_remote(&self, update: proto::UpdateWorktree) { + pub fn update_from_remote(&self, update: WorktreeRelatedMessage) { if let Some(updates_tx) = &self.updates_tx { updates_tx .unbounded_send(update) @@ -2388,29 +2410,41 @@ impl RemoteWorktree { fn observe_updates(&mut self, project_id: u64, cx: &Context, callback: F) where - F: 'static + Send + Fn(proto::UpdateWorktree) -> Fut, + F: 'static + Send + Fn(WorktreeRelatedMessage) -> Fut, Fut: 'static + Send + Future, { let (tx, mut rx) = mpsc::unbounded(); - let initial_update = self + let initial_updates = self .snapshot .build_initial_update(project_id, self.id().to_proto()); self.update_observer = Some(tx); cx.spawn(async move |this, cx| { - let mut update = initial_update; + let mut updates = initial_updates; 'outer: loop { - // SSH projects use a special project ID of 0, and we need to - // remap it to the correct one here. - update.project_id = project_id; + for mut update in updates { + // SSH projects use a special project ID of 0, and we need to + // remap it to the correct one here. + match &mut update { + WorktreeRelatedMessage::UpdateWorktree(update_worktree) => { + update_worktree.project_id = project_id; + } + WorktreeRelatedMessage::UpdateRepository(update_repository) => { + update_repository.project_id = project_id; + } + WorktreeRelatedMessage::RemoveRepository(remove_repository) => { + remove_repository.project_id = project_id; + } + }; - for chunk in split_worktree_update(update) { - if !callback(chunk).await { - break 'outer; + for chunk in split_worktree_related_message(update) { + if !callback(chunk).await { + break 'outer; + } } } if let Some(next_update) = rx.next().await { - update = next_update; + updates = vec![next_update]; } else { break; } @@ -2570,7 +2604,11 @@ impl Snapshot { self.abs_path.as_path() } - fn build_initial_update(&self, project_id: u64, worktree_id: u64) -> proto::UpdateWorktree { + fn build_initial_update( + &self, + project_id: u64, + worktree_id: u64, + ) -> Vec { let mut updated_entries = self .entries_by_path .iter() @@ -2578,14 +2616,7 @@ impl Snapshot { .collect::>(); updated_entries.sort_unstable_by_key(|e| e.id); - let mut updated_repositories = self - .repositories - .iter() - .map(|repository| repository.initial_update()) - .collect::>(); - updated_repositories.sort_unstable_by_key(|e| e.work_directory_id); - - proto::UpdateWorktree { + [proto::UpdateWorktree { project_id, worktree_id, abs_path: self.abs_path().to_proto(), @@ -2594,9 +2625,18 @@ impl Snapshot { removed_entries: Vec::new(), scan_id: self.scan_id as u64, is_last_update: self.completed_scan_id == self.scan_id, - updated_repositories, + // Sent in separate messages. + updated_repositories: Vec::new(), removed_repositories: Vec::new(), } + .into()] + .into_iter() + .chain( + self.repositories + .iter() + .map(|repository| repository.initial_update(project_id, self.scan_id).into()), + ) + .collect() } pub fn absolutize(&self, path: &Path) -> Result { @@ -2678,9 +2718,97 @@ impl Snapshot { } } - pub(crate) fn apply_remote_update( + pub(crate) fn apply_update_repository( &mut self, - mut update: proto::UpdateWorktree, + update: proto::UpdateRepository, + ) -> Result<()> { + // NOTE: this is practically but not semantically correct. For now we're using the + // ID field to store the work directory ID, but eventually it will be a different + // kind of ID. + let work_directory_id = ProjectEntryId::from_proto(update.id); + + if let Some(work_dir_entry) = self.entry_for_id(work_directory_id) { + let conflicted_paths = TreeSet::from_ordered_entries( + update + .current_merge_conflicts + .into_iter() + .map(|path| RepoPath(Path::new(&path).into())), + ); + + if self + .repositories + .contains(&PathKey(work_dir_entry.path.clone()), &()) + { + let edits = update + .removed_statuses + .into_iter() + .map(|path| Edit::Remove(PathKey(FromProto::from_proto(path)))) + .chain( + update + .updated_statuses + .into_iter() + .filter_map(|updated_status| { + Some(Edit::Insert(updated_status.try_into().log_err()?)) + }), + ) + .collect::>(); + + self.repositories + .update(&PathKey(work_dir_entry.path.clone()), &(), |repo| { + repo.current_branch = update.branch_summary.as_ref().map(proto_to_branch); + repo.statuses_by_path.edit(edits, &()); + repo.current_merge_conflicts = conflicted_paths + }); + } else { + let statuses = SumTree::from_iter( + update + .updated_statuses + .into_iter() + .filter_map(|updated_status| updated_status.try_into().log_err()), + &(), + ); + + self.repositories.insert_or_replace( + RepositoryEntry { + work_directory_id, + // When syncing repository entries from a peer, we don't need + // the location_in_repo field, since git operations don't happen locally + // anyway. + work_directory: WorkDirectory::InProject { + relative_path: work_dir_entry.path.clone(), + }, + current_branch: update.branch_summary.as_ref().map(proto_to_branch), + statuses_by_path: statuses, + current_merge_conflicts: conflicted_paths, + work_directory_abs_path: update.abs_path.into(), + }, + &(), + ); + } + } else { + log::error!("no work directory entry for repository {:?}", update.id) + } + + Ok(()) + } + + pub(crate) fn apply_remove_repository( + &mut self, + update: proto::RemoveRepository, + ) -> Result<()> { + // NOTE: this is practically but not semantically correct. For now we're using the + // ID field to store the work directory ID, but eventually it will be a different + // kind of ID. + let work_directory_id = ProjectEntryId::from_proto(update.id); + self.repositories.retain(&(), |entry: &RepositoryEntry| { + entry.work_directory_id != work_directory_id + }); + Ok(()) + } + + pub(crate) fn apply_update_worktree( + &mut self, + update: proto::UpdateWorktree, always_included_paths: &PathMatcher, ) -> Result<()> { log::debug!( @@ -2726,79 +2854,6 @@ impl Snapshot { self.entries_by_path.edit(entries_by_path_edits, &()); self.entries_by_id.edit(entries_by_id_edits, &()); - update.removed_repositories.sort_unstable(); - self.repositories.retain(&(), |entry: &RepositoryEntry| { - update - .removed_repositories - .binary_search(&entry.work_directory_id.to_proto()) - .is_err() - }); - - for repository in update.updated_repositories { - let work_directory_id = ProjectEntryId::from_proto(repository.work_directory_id); - if let Some(work_dir_entry) = self.entry_for_id(work_directory_id) { - let conflicted_paths = TreeSet::from_ordered_entries( - repository - .current_merge_conflicts - .into_iter() - .map(|path| RepoPath(Path::new(&path).into())), - ); - - if self - .repositories - .contains(&PathKey(work_dir_entry.path.clone()), &()) - { - let edits = repository - .removed_statuses - .into_iter() - .map(|path| Edit::Remove(PathKey(FromProto::from_proto(path)))) - .chain(repository.updated_statuses.into_iter().filter_map( - |updated_status| { - Some(Edit::Insert(updated_status.try_into().log_err()?)) - }, - )) - .collect::>(); - - self.repositories - .update(&PathKey(work_dir_entry.path.clone()), &(), |repo| { - repo.current_branch = - repository.branch_summary.as_ref().map(proto_to_branch); - repo.statuses_by_path.edit(edits, &()); - repo.current_merge_conflicts = conflicted_paths - }); - } else { - let statuses = SumTree::from_iter( - repository - .updated_statuses - .into_iter() - .filter_map(|updated_status| updated_status.try_into().log_err()), - &(), - ); - - self.repositories.insert_or_replace( - RepositoryEntry { - work_directory_id, - // When syncing repository entries from a peer, we don't need - // the location_in_repo field, since git operations don't happen locally - // anyway. - work_directory: WorkDirectory::InProject { - relative_path: work_dir_entry.path.clone(), - }, - current_branch: repository.branch_summary.as_ref().map(proto_to_branch), - statuses_by_path: statuses, - current_merge_conflicts: conflicted_paths, - }, - &(), - ); - } - } else { - log::error!( - "no work directory entry for repository {:?}", - repository.work_directory_id - ) - } - } - self.scan_id = update.scan_id as usize; if update.is_last_update { self.completed_scan_id = update.scan_id as usize; @@ -2807,6 +2862,24 @@ impl Snapshot { Ok(()) } + pub(crate) fn apply_remote_update( + &mut self, + update: WorktreeRelatedMessage, + always_included_paths: &PathMatcher, + ) -> Result<()> { + match update { + WorktreeRelatedMessage::UpdateWorktree(update) => { + self.apply_update_worktree(update, always_included_paths) + } + WorktreeRelatedMessage::UpdateRepository(update) => { + self.apply_update_repository(update) + } + WorktreeRelatedMessage::RemoveRepository(update) => { + self.apply_remove_repository(update) + } + } + } + pub fn entry_count(&self) -> usize { self.entries_by_path.summary().count } @@ -3046,11 +3119,10 @@ impl LocalSnapshot { worktree_id: u64, entry_changes: UpdatedEntriesSet, repo_changes: UpdatedGitRepositoriesSet, - ) -> proto::UpdateWorktree { + ) -> Vec { let mut updated_entries = Vec::new(); let mut removed_entries = Vec::new(); - let mut updated_repositories = Vec::new(); - let mut removed_repositories = Vec::new(); + let mut updates = Vec::new(); for (_, entry_id, path_change) in entry_changes.iter() { if let PathChange::Removed = path_change { @@ -3064,13 +3136,23 @@ impl LocalSnapshot { let new_repo = self.repositories.get(&PathKey(entry.path.clone()), &()); match (&change.old_repository, new_repo) { (Some(old_repo), Some(new_repo)) => { - updated_repositories.push(new_repo.build_update(old_repo)); + updates.push( + new_repo + .build_update(old_repo, project_id, self.scan_id) + .into(), + ); } (None, Some(new_repo)) => { - updated_repositories.push(new_repo.initial_update()); + updates.push(new_repo.initial_update(project_id, self.scan_id).into()); } (Some(old_repo), None) => { - removed_repositories.push(old_repo.work_directory_id.to_proto()); + updates.push( + proto::RemoveRepository { + project_id, + id: old_repo.work_directory_id.to_proto(), + } + .into(), + ); } _ => {} } @@ -3078,24 +3160,27 @@ impl LocalSnapshot { removed_entries.sort_unstable(); updated_entries.sort_unstable_by_key(|e| e.id); - removed_repositories.sort_unstable(); - updated_repositories.sort_unstable_by_key(|e| e.work_directory_id); // TODO - optimize, knowing that removed_entries are sorted. removed_entries.retain(|id| updated_entries.binary_search_by_key(id, |e| e.id).is_err()); - proto::UpdateWorktree { - project_id, - worktree_id, - abs_path: self.abs_path().to_proto(), - root_name: self.root_name().to_string(), - updated_entries, - removed_entries, - scan_id: self.scan_id as u64, - is_last_update: self.completed_scan_id == self.scan_id, - updated_repositories, - removed_repositories, - } + updates.push( + proto::UpdateWorktree { + project_id, + worktree_id, + abs_path: self.abs_path().to_proto(), + root_name: self.root_name().to_string(), + updated_entries, + removed_entries, + scan_id: self.scan_id as u64, + is_last_update: self.completed_scan_id == self.scan_id, + // Sent in separate messages. + updated_repositories: Vec::new(), + removed_repositories: Vec::new(), + } + .into(), + ); + updates } fn insert_entry(&mut self, mut entry: Entry, fs: &dyn Fs) -> Entry { @@ -3547,12 +3632,15 @@ impl BackgroundScannerState { watcher: &dyn Watcher, ) -> Option { log::info!("insert git repository for {dot_git_path:?}"); - let work_dir_id = self - .snapshot - .entry_for_path(work_directory.path_key().0) - .map(|entry| entry.id)?; + let work_dir_entry = self.snapshot.entry_for_path(work_directory.path_key().0)?; + let work_directory_abs_path = self.snapshot.absolutize(&work_dir_entry.path).log_err()?; - if self.snapshot.git_repositories.get(&work_dir_id).is_some() { + if self + .snapshot + .git_repositories + .get(&work_dir_entry.id) + .is_some() + { log::info!("existing git repository for {work_directory:?}"); return None; } @@ -3593,10 +3681,12 @@ impl BackgroundScannerState { ); } + let work_directory_id = work_dir_entry.id; self.snapshot.repositories.insert_or_replace( RepositoryEntry { - work_directory_id: work_dir_id, + work_directory_id, work_directory: work_directory.clone(), + work_directory_abs_path, current_branch: None, statuses_by_path: Default::default(), current_merge_conflicts: Default::default(), @@ -3605,7 +3695,7 @@ impl BackgroundScannerState { ); let local_repository = LocalRepositoryEntry { - work_directory_id: work_dir_id, + work_directory_id, work_directory: work_directory.clone(), git_dir_scan_id: 0, status_scan_id: 0, @@ -3618,7 +3708,7 @@ impl BackgroundScannerState { self.snapshot .git_repositories - .insert(work_dir_id, local_repository.clone()); + .insert(work_directory_id, local_repository.clone()); log::info!("inserting new local git repository"); Some(local_repository) diff --git a/crates/worktree/src/worktree_tests.rs b/crates/worktree/src/worktree_tests.rs index c838463ae7b9f3e2279c022042224c53fed50b67..5e0e249364d9c47ebc434c76761ab83edb1a77d2 100644 --- a/crates/worktree/src/worktree_tests.rs +++ b/crates/worktree/src/worktree_tests.rs @@ -18,6 +18,7 @@ use parking_lot::Mutex; use postage::stream::Stream; use pretty_assertions::assert_eq; use rand::prelude::*; +use rpc::proto::WorktreeRelatedMessage; use serde_json::json; use settings::{Settings, SettingsStore}; use std::{ @@ -1748,7 +1749,12 @@ async fn test_random_worktree_operations_during_initial_scan( for (i, snapshot) in snapshots.into_iter().enumerate().rev() { let mut updated_snapshot = snapshot.clone(); for update in updates.lock().iter() { - if update.scan_id >= updated_snapshot.scan_id() as u64 { + let scan_id = match update { + WorktreeRelatedMessage::UpdateWorktree(update) => update.scan_id, + WorktreeRelatedMessage::UpdateRepository(update) => update.scan_id, + WorktreeRelatedMessage::RemoveRepository(_) => u64::MAX, + }; + if scan_id >= updated_snapshot.scan_id() as u64 { updated_snapshot .apply_remote_update(update.clone(), &settings.file_scan_inclusions) .unwrap(); @@ -1885,7 +1891,12 @@ async fn test_random_worktree_changes(cx: &mut TestAppContext, mut rng: StdRng) for (i, mut prev_snapshot) in snapshots.into_iter().enumerate().rev() { for update in updates.lock().iter() { - if update.scan_id >= prev_snapshot.scan_id() as u64 { + let scan_id = match update { + WorktreeRelatedMessage::UpdateWorktree(update) => update.scan_id, + WorktreeRelatedMessage::UpdateRepository(update) => update.scan_id, + WorktreeRelatedMessage::RemoveRepository(_) => u64::MAX, + }; + if scan_id >= prev_snapshot.scan_id() as u64 { prev_snapshot .apply_remote_update(update.clone(), &settings.file_scan_inclusions) .unwrap();