diff --git a/Cargo.lock b/Cargo.lock index 3c6fb43bf719f112d050aa37986abd1e255999c7..b96bcce58167661d73830530f0cdcb414dc8b6e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -261,9 +261,9 @@ checksum = "2f23d769dbf1838d5df5156e7b1ad404f4c463d1ac2c6aeb6cd943630f8a8400" dependencies = [ "futures-core", "futures-io", - "rustls", - "webpki", - "webpki-roots", + "rustls 0.19.1", + "webpki 0.21.4", + "webpki-roots 0.21.1", ] [[package]] @@ -293,9 +293,9 @@ dependencies = [ [[package]] name = "atoi" -version = "0.4.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "616896e05fc0e2649463a93a15183c6a16bf03413a7af88ef1285ddedfa9cda5" +checksum = "d7c57d12312ff59c811c0643f4d80830505833c9ffaebd193d819392b265be8e" dependencies = [ "num-traits", ] @@ -446,12 +446,6 @@ dependencies = [ "rustc-demangle", ] -[[package]] -name = "base-x" -version = "0.2.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc19a4937b4fbd3fe3379793130e42060d10627a360f2127802b10b87e7baf74" - [[package]] name = "base64" version = "0.12.3" @@ -670,7 +664,7 @@ dependencies = [ "postage", "settings", "theme", - "time 0.3.9", + "time 0.3.10", "util", "workspace", ] @@ -803,7 +797,7 @@ dependencies = [ "smol", "sum_tree", "thiserror", - "time 0.3.9", + "time 0.3.10", "tiny_http", "url", "util", @@ -893,7 +887,7 @@ dependencies = [ "sha-1 0.9.8", "sqlx", "theme", - "time 0.2.27", + "time 0.3.10", "tokio", "tokio-tungstenite", "toml", @@ -946,12 +940,6 @@ dependencies = [ "cache-padded", ] -[[package]] -name = "const_fn" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbdcdcb6d86f71c5e97409ad45898af11cbc995b4ee8112d59095a28d376c935" - [[package]] name = "contacts_panel" version = "0.1.0" @@ -1056,18 +1044,18 @@ dependencies = [ [[package]] name = "crc" -version = "2.1.0" +version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49fc9a695bca7f35f5f4c15cddc84415f66a74ea78eef08e90c5024f2b540e23" +checksum = "53757d12b596c16c78b83458d732a5d1a17ab3f53f2f7412f6fb57cc8a140ab3" dependencies = [ "crc-catalog", ] [[package]] name = "crc-catalog" -version = "1.1.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccaeedb56da03b09f598226e25e80088cb4cd25f316e6e4df7d695f0feeb1403" +checksum = "2d0165d2900ae6778e36e80bbc4da3b5eefccee9ba939761f9c2882a5d9af3ff" [[package]] name = "crc32fast" @@ -1340,12 +1328,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "discard" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" - [[package]] name = "dotenv" version = "0.15.0" @@ -1930,7 +1912,7 @@ dependencies = [ "smallvec", "smol", "sum_tree", - "time 0.3.9", + "time 0.3.10", "tiny-skia", "tree-sitter", "usvg", @@ -1971,17 +1953,23 @@ name = "hashbrown" version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" + +[[package]] +name = "hashbrown" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db0d4cf898abf0081f964436dc980e96670a0f36863e4b83aaacdb65c9d7ccc3" dependencies = [ "ahash", ] [[package]] name = "hashlink" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7249a3129cbc1ffccd74857f81464a323a152173cdb134e0fd81bc803b29facf" +checksum = "d452c155cb93fecdfb02a73dd57b5d8e442c2063bd7aac72f1bc5e4263a43086" dependencies = [ - "hashbrown", + "hashbrown 0.12.1", ] [[package]] @@ -2229,7 +2217,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6012d540c5baa3589337a98ce73408de9b5a25ec9fc2c6fd6be8f0d39e0ca5a" dependencies = [ "autocfg 1.1.0", - "hashbrown", + "hashbrown 0.11.2", ] [[package]] @@ -2271,7 +2259,7 @@ dependencies = [ "rand 0.7.3", "serde", "tempfile", - "uuid", + "uuid 0.8.2", "winapi 0.3.9", ] @@ -3120,7 +3108,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39fe46acc5503595e5949c17b818714d26fdf9b4920eacf3b2947f0199f4a6ff" dependencies = [ - "rustc_version 0.3.3", + "rustc_version", ] [[package]] @@ -3250,7 +3238,7 @@ dependencies = [ "indexmap", "line-wrap", "serde", - "time 0.3.9", + "time 0.3.10", "xml-rs", ] @@ -3331,12 +3319,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "proc-macro-hack" -version = "0.5.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" - [[package]] name = "proc-macro2" version = "1.0.39" @@ -3927,22 +3909,13 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" -[[package]] -name = "rustc_version" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" -dependencies = [ - "semver 0.9.0", -] - [[package]] name = "rustc_version" version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee" dependencies = [ - "semver 0.11.0", + "semver", ] [[package]] @@ -3954,8 +3927,29 @@ dependencies = [ "base64 0.13.0", "log", "ring", - "sct", - "webpki", + "sct 0.6.1", + "webpki 0.21.4", +] + +[[package]] +name = "rustls" +version = "0.20.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033" +dependencies = [ + "log", + "ring", + "sct 0.7.0", + "webpki 0.22.0", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7522c9de787ff061458fe9a829dc790a3f5b22dc571694fc5883f448b94d9a9" +dependencies = [ + "base64 0.13.0", ] [[package]] @@ -4083,6 +4077,16 @@ dependencies = [ "untrusted", ] +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "seahash" version = "4.1.0" @@ -4135,30 +4139,15 @@ dependencies = [ "libc", ] -[[package]] -name = "semver" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" -dependencies = [ - "semver-parser 0.7.0", -] - [[package]] name = "semver" version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6" dependencies = [ - "semver-parser 0.10.2", + "semver-parser", ] -[[package]] -name = "semver-parser" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" - [[package]] name = "semver-parser" version = "0.10.2" @@ -4321,21 +4310,6 @@ dependencies = [ "digest 0.10.3", ] -[[package]] -name = "sha1" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1da05c97445caa12d05e848c4a4fcbbea29e748ac28f7e80e9b010392063770" -dependencies = [ - "sha1_smol", -] - -[[package]] -name = "sha1_smol" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" - [[package]] name = "sha2" version = "0.9.9" @@ -4526,9 +4500,9 @@ dependencies = [ [[package]] name = "sqlx" -version = "0.5.13" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "551873805652ba0d912fec5bbb0f8b4cdd96baf8e2ebf5970e5671092966019b" +checksum = "1f82cbe94f41641d6c410ded25bbf5097c240cefdf8e3b06d04198d0a96af6a4" dependencies = [ "sqlx-core", "sqlx-macros", @@ -4536,9 +4510,9 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.5.13" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e48c61941ccf5ddcada342cd59e3e5173b007c509e1e8e990dafc830294d9dc5" +checksum = "6b69bf218860335ddda60d6ce85ee39f6cf6e5630e300e19757d1de15886a093" dependencies = [ "ahash", "atoi", @@ -4569,7 +4543,8 @@ dependencies = [ "paste", "percent-encoding", "rand 0.8.5", - "rustls", + "rustls 0.20.6", + "rustls-pemfile", "serde", "serde_json", "sha-1 0.10.0", @@ -4579,20 +4554,19 @@ dependencies = [ "sqlx-rt", "stringprep", "thiserror", - "time 0.2.27", + "time 0.3.10", "tokio-stream", "url", - "uuid", - "webpki", - "webpki-roots", + "uuid 1.1.2", + "webpki-roots 0.22.3", "whoami", ] [[package]] name = "sqlx-macros" -version = "0.5.13" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc0fba2b0cae21fc00fe6046f8baa4c7fcb49e379f0f592b04696607f69ed2e1" +checksum = "f40c63177cf23d356b159b60acd27c54af7423f1736988502e36bae9a712118f" dependencies = [ "dotenv", "either", @@ -4609,79 +4583,21 @@ dependencies = [ [[package]] name = "sqlx-rt" -version = "0.5.13" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4db708cd3e459078f85f39f96a00960bd841f66ee2a669e90bf36907f5a79aae" +checksum = "874e93a365a598dc3dadb197565952cb143ae4aa716f7bcc933a8d836f6bf89f" dependencies = [ "once_cell", "tokio", "tokio-rustls", ] -[[package]] -name = "standback" -version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e113fb6f3de07a243d434a56ec6f186dfd51cb08448239fe7bcae73f87ff28ff" -dependencies = [ - "version_check", -] - [[package]] name = "static_assertions" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" -[[package]] -name = "stdweb" -version = "0.4.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d022496b16281348b52d0e30ae99e01a73d737b2f45d38fed4edf79f9325a1d5" -dependencies = [ - "discard", - "rustc_version 0.2.3", - "stdweb-derive", - "stdweb-internal-macros", - "stdweb-internal-runtime", - "wasm-bindgen", -] - -[[package]] -name = "stdweb-derive" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c87a60a40fccc84bef0652345bbbbbe20a605bf5d0ce81719fc476f5c03b50ef" -dependencies = [ - "proc-macro2", - "quote", - "serde", - "serde_derive", - "syn", -] - -[[package]] -name = "stdweb-internal-macros" -version = "0.2.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58fa5ff6ad0d98d1ffa8cb115892b6e69d67799f6763e162a1c9db421dc22e11" -dependencies = [ - "base-x", - "proc-macro2", - "quote", - "serde", - "serde_derive", - "serde_json", - "sha1", - "syn", -] - -[[package]] -name = "stdweb-internal-runtime" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" - [[package]] name = "stringprep" version = "0.1.2" @@ -4945,52 +4861,22 @@ dependencies = [ [[package]] name = "time" -version = "0.2.27" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4752a97f8eebd6854ff91f1c1824cd6160626ac4bd44287f7f4ea2035a02a242" -dependencies = [ - "const_fn", - "libc", - "standback", - "stdweb", - "time-macros", - "version_check", - "winapi 0.3.9", -] - -[[package]] -name = "time" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd" +checksum = "82501a4c1c0330d640a6e176a3d6a204f5ec5237aca029029d21864a902e27b0" dependencies = [ "itoa", "libc", "num_threads", + "serde", + "time-macros", ] [[package]] name = "time-macros" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "957e9c6e26f12cb6d0dd7fc776bb67a706312e7299aed74c8dd5b17ebb27e2f1" -dependencies = [ - "proc-macro-hack", - "time-macros-impl", -] - -[[package]] -name = "time-macros-impl" -version = "0.1.2" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd3c141a1b43194f3f56a1411225df8646c55781d5f26db825b3d98507eb482f" -dependencies = [ - "proc-macro-hack", - "proc-macro2", - "quote", - "standback", - "syn", -] +checksum = "42657b1a6f4d817cda8e7a0ace261fe0cc946cf3a80314390b22cc61ae080792" [[package]] name = "tiny-skia" @@ -5087,13 +4973,13 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.22.0" +version = "0.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ - "rustls", + "rustls 0.20.6", "tokio", - "webpki", + "webpki 0.22.0", ] [[package]] @@ -5651,6 +5537,12 @@ dependencies = [ "getrandom 0.2.6", ] +[[package]] +name = "uuid" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f" + [[package]] name = "valuable" version = "0.1.0" @@ -5839,13 +5731,32 @@ dependencies = [ "untrusted", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "webpki-roots" version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940" dependencies = [ - "webpki", + "webpki 0.21.4", +] + +[[package]] +name = "webpki-roots" +version = "0.22.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d8de8415c823c8abd270ad483c6feeac771fad964890779f9a8cb24fbbc1bf" +dependencies = [ + "webpki 0.22.0", ] [[package]] diff --git a/crates/chat_panel/Cargo.toml b/crates/chat_panel/Cargo.toml index e54245502ffa91843b4b7510066c41275cee096b..3f3a2651b0e63022d21b66bcdeb62ca981d90a55 100644 --- a/crates/chat_panel/Cargo.toml +++ b/crates/chat_panel/Cargo.toml @@ -17,4 +17,4 @@ theme = { path = "../theme" } util = { path = "../util" } workspace = { path = "../workspace" } postage = { version = "0.4.1", features = ["futures-traits"] } -time = "0.3" +time = { version = "0.3", features = ["serde", "serde-well-known"] } diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index 45e8547c2c0bd40705e6694c8c4af45ec67ed5c9..a7888b8965319e120f540ad8b296567dcb6b0e0b 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -29,7 +29,7 @@ postage = { version = "0.4.1", features = ["futures-traits"] } rand = "0.8.3" smol = "1.2.5" thiserror = "1.0.29" -time = "0.3" +time = { version = "0.3", features = ["serde", "serde-well-known"] } tiny_http = "0.8" url = "2.2" diff --git a/crates/collab/Cargo.toml b/crates/collab/Cargo.toml index 4f2b505aacfa5af3961f62c63936695f8178ba82..9b3603e6e43e29acb6071a9c7670d07d412a91ba 100644 --- a/crates/collab/Cargo.toml +++ b/crates/collab/Cargo.toml @@ -38,7 +38,7 @@ scrypt = "0.7" serde = { version = "1.0", features = ["derive", "rc"] } serde_json = "1.0" sha-1 = "0.9" -time = "0.2" +time = { version = "0.3", features = ["serde", "serde-well-known"] } tokio = { version = "1", features = ["full"] } tokio-tungstenite = "0.17" tonic = "0.6" @@ -49,7 +49,7 @@ tracing-log = "0.1.3" tracing-subscriber = { version = "0.3.11", features = ["env-filter", "json"] } [dependencies.sqlx] -version = "0.5.2" +version = "0.6" features = ["runtime-tokio-rustls", "postgres", "time", "uuid"] [dev-dependencies] diff --git a/crates/collab/migrations/20220620211403_create_projects.sql b/crates/collab/migrations/20220620211403_create_projects.sql new file mode 100644 index 0000000000000000000000000000000000000000..d813c9f7a1811e50227c312c66df0fd679c35166 --- /dev/null +++ b/crates/collab/migrations/20220620211403_create_projects.sql @@ -0,0 +1,24 @@ +CREATE TABLE IF NOT EXISTS "projects" ( + "id" SERIAL PRIMARY KEY, + "host_user_id" INTEGER REFERENCES users (id) NOT NULL, + "unregistered" BOOLEAN NOT NULL DEFAULT false +); + +CREATE TABLE IF NOT EXISTS "worktree_extensions" ( + "id" SERIAL PRIMARY KEY, + "project_id" INTEGER REFERENCES projects (id) NOT NULL, + "worktree_id" INTEGER NOT NULL, + "extension" VARCHAR(255), + "count" INTEGER NOT NULL +); + +CREATE TABLE IF NOT EXISTS "project_activity_periods" ( + "id" SERIAL PRIMARY KEY, + "duration_millis" INTEGER NOT NULL, + "ended_at" TIMESTAMP NOT NULL, + "user_id" INTEGER REFERENCES users (id) NOT NULL, + "project_id" INTEGER REFERENCES projects (id) NOT NULL +); + +CREATE INDEX "index_project_activity_periods_on_ended_at" ON "project_activity_periods" ("ended_at"); +CREATE UNIQUE INDEX "index_worktree_extensions_on_project_id_and_worktree_id_and_extension" ON "worktree_extensions" ("project_id", "worktree_id", "extension"); \ No newline at end of file diff --git a/crates/collab/src/api.rs b/crates/collab/src/api.rs index 993e32e445f383deb752161be76bc733ddd46f81..6440e8cb3048b0709b5efe64f9b1089ed74930bf 100644 --- a/crates/collab/src/api.rs +++ b/crates/collab/src/api.rs @@ -1,6 +1,6 @@ use crate::{ auth, - db::{User, UserId}, + db::{ProjectId, User, UserId}, rpc::{self, ResultExt}, AppState, Error, Result, }; @@ -16,7 +16,9 @@ use axum::{ }; use axum_extra::response::ErasedJson; use serde::{Deserialize, Serialize}; +use serde_json::json; use std::sync::Arc; +use time::OffsetDateTime; use tower::ServiceBuilder; use tracing::instrument; @@ -32,6 +34,11 @@ pub fn routes(rpc_server: &Arc, state: Arc) -> Router, + Extension(app): Extension>, +) -> Result { + let summary = app + .db + .summarize_project_activity(params.start..params.end, 100) + .await?; + Ok(ErasedJson::pretty(summary)) +} + +#[derive(Deserialize)] +struct GetProjectMetadataParams { + project_id: u64, +} + +async fn get_project_metadata( + Query(params): Query, + Extension(app): Extension>, +) -> Result { + let extensions = app + .db + .get_project_extensions(ProjectId::from_proto(params.project_id)) + .await?; + Ok(ErasedJson::pretty(json!({ "extensions": extensions }))) +} + #[derive(Deserialize)] struct CreateAccessTokenQueryParams { public_key: String, diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index c1d5506491c6f05c7bf89c1cd7efac4564f3a5af..e4555ed856f5bfbd3fc8a6cc7b63a7b8a1f06599 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1,7 +1,10 @@ +use std::{ops::Range, time::Duration}; + use crate::{Error, Result}; use anyhow::{anyhow, Context}; use async_trait::async_trait; use axum::http::StatusCode; +use collections::HashMap; use futures::StreamExt; use nanoid::nanoid; use serde::Serialize; @@ -37,6 +40,42 @@ pub trait Db: Send + Sync { email_address: Option<&str>, ) -> Result; + /// Registers a new project for the given user. + async fn register_project(&self, host_user_id: UserId) -> Result; + + /// Unregisters a project for the given project id. + async fn unregister_project(&self, project_id: ProjectId) -> Result<()>; + + /// Update file counts by extension for the given project and worktree. + async fn update_worktree_extensions( + &self, + project_id: ProjectId, + worktree_id: u64, + extensions: HashMap, + ) -> Result<()>; + + /// Get the file counts on the given project keyed by their worktree and extension. + async fn get_project_extensions( + &self, + project_id: ProjectId, + ) -> Result>>; + + /// Record which users have been active in which projects during + /// a given period of time. + async fn record_project_activity( + &self, + time_period: Range, + active_projects: &[(UserId, ProjectId)], + ) -> Result<()>; + + /// Get the users that have been most active during the given time period, + /// along with the amount of time they have been active in each project. + async fn summarize_project_activity( + &self, + time_period: Range, + max_user_count: usize, + ) -> Result>; + async fn get_contacts(&self, id: UserId) -> Result>; async fn has_contact(&self, user_id_a: UserId, user_id_b: UserId) -> Result; async fn send_contact_request(&self, requester_id: UserId, responder_id: UserId) -> Result<()>; @@ -145,12 +184,12 @@ impl Db for PostgresDb { async fn get_all_users(&self, page: u32, limit: u32) -> Result> { let query = "SELECT * FROM users ORDER BY github_login ASC LIMIT $1 OFFSET $2"; Ok(sqlx::query_as(query) - .bind(limit) - .bind(page * limit) + .bind(limit as i32) + .bind((page * limit) as i32) .fetch_all(&self.pool) .await?) } - + async fn create_users(&self, users: Vec<(String, String, usize)>) -> Result> { let mut query = QueryBuilder::new( "INSERT INTO users (github_login, email_address, admin, invite_code, invite_count)", @@ -163,7 +202,7 @@ impl Db for PostgresDb { .push_bind(email_address) .push_bind(false) .push_bind(nanoid!(16)) - .push_bind(invite_count as u32); + .push_bind(invite_count as i32); }, ); query.push( @@ -198,7 +237,7 @@ impl Db for PostgresDb { Ok(sqlx::query_as(query) .bind(like_string) .bind(name_query) - .bind(limit) + .bind(limit as i32) .fetch_all(&self.pool) .await?) } @@ -289,7 +328,7 @@ impl Db for PostgresDb { WHERE id = $2 ", ) - .bind(count) + .bind(count as i32) .bind(id) .execute(&mut tx) .await?; @@ -411,6 +450,178 @@ impl Db for PostgresDb { Ok(invitee_id) } + // projects + + async fn register_project(&self, host_user_id: UserId) -> Result { + Ok(sqlx::query_scalar( + " + INSERT INTO projects(host_user_id) + VALUES ($1) + RETURNING id + ", + ) + .bind(host_user_id) + .fetch_one(&self.pool) + .await + .map(ProjectId)?) + } + + async fn unregister_project(&self, project_id: ProjectId) -> Result<()> { + sqlx::query( + " + UPDATE projects + SET unregistered = 't' + WHERE id = $1 + ", + ) + .bind(project_id) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn update_worktree_extensions( + &self, + project_id: ProjectId, + worktree_id: u64, + extensions: HashMap, + ) -> Result<()> { + let mut query = QueryBuilder::new( + "INSERT INTO worktree_extensions (project_id, worktree_id, extension, count)", + ); + query.push_values(extensions, |mut query, (extension, count)| { + query + .push_bind(project_id) + .push_bind(worktree_id as i32) + .push_bind(extension) + .push_bind(count as i32); + }); + query.push( + " + ON CONFLICT (project_id, worktree_id, extension) DO UPDATE SET + count = excluded.count + ", + ); + query.build().execute(&self.pool).await?; + + Ok(()) + } + + async fn get_project_extensions( + &self, + project_id: ProjectId, + ) -> Result>> { + #[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)] + struct WorktreeExtension { + worktree_id: i32, + extension: String, + count: i32, + } + + let query = " + SELECT worktree_id, extension, count + FROM worktree_extensions + WHERE project_id = $1 + "; + let counts = sqlx::query_as::<_, WorktreeExtension>(query) + .bind(&project_id) + .fetch_all(&self.pool) + .await?; + + let mut extension_counts = HashMap::default(); + for count in counts { + extension_counts + .entry(count.worktree_id as u64) + .or_insert(HashMap::default()) + .insert(count.extension, count.count as usize); + } + Ok(extension_counts) + } + + async fn record_project_activity( + &self, + time_period: Range, + projects: &[(UserId, ProjectId)], + ) -> Result<()> { + let query = " + INSERT INTO project_activity_periods + (ended_at, duration_millis, user_id, project_id) + VALUES + ($1, $2, $3, $4); + "; + + let mut tx = self.pool.begin().await?; + let duration_millis = + ((time_period.end - time_period.start).as_seconds_f64() * 1000.0) as i32; + for (user_id, project_id) in projects { + sqlx::query(query) + .bind(time_period.end) + .bind(duration_millis) + .bind(user_id) + .bind(project_id) + .execute(&mut tx) + .await?; + } + tx.commit().await?; + + Ok(()) + } + + async fn summarize_project_activity( + &self, + time_period: Range, + max_user_count: usize, + ) -> Result> { + let query = " + WITH + project_durations AS ( + SELECT user_id, project_id, SUM(duration_millis) AS project_duration + FROM project_activity_periods + WHERE $1 <= ended_at AND ended_at <= $2 + GROUP BY user_id, project_id + ), + user_durations AS ( + SELECT user_id, SUM(project_duration) as total_duration + FROM project_durations + GROUP BY user_id + ORDER BY total_duration DESC + LIMIT $3 + ) + SELECT user_durations.user_id, users.github_login, project_id, project_duration + FROM user_durations, project_durations, users + WHERE + user_durations.user_id = project_durations.user_id AND + user_durations.user_id = users.id + ORDER BY user_id ASC, project_duration DESC + "; + + let mut rows = sqlx::query_as::<_, (UserId, String, ProjectId, i64)>(query) + .bind(time_period.start) + .bind(time_period.end) + .bind(max_user_count as i32) + .fetch(&self.pool); + + let mut result = Vec::::new(); + while let Some(row) = rows.next().await { + let (user_id, github_login, project_id, duration_millis) = row?; + let project_id = project_id; + let duration = Duration::from_millis(duration_millis as u64); + if let Some(last_summary) = result.last_mut() { + if last_summary.id == user_id { + last_summary.project_activity.push((project_id, duration)); + continue; + } + } + result.push(UserActivitySummary { + id: user_id, + project_activity: vec![(project_id, duration)], + github_login, + }); + } + + Ok(result) + } + // contacts async fn get_contacts(&self, user_id: UserId) -> Result> { @@ -651,7 +862,7 @@ impl Db for PostgresDb { sqlx::query(cleanup_query) .bind(user_id.0) .bind(access_token_hash) - .bind(max_access_token_count as u32) + .bind(max_access_token_count as i32) .execute(&mut tx) .await?; Ok(tx.commit().await?) @@ -927,6 +1138,21 @@ pub struct User { pub connected_once: bool, } +id_type!(ProjectId); +#[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)] +pub struct Project { + pub id: ProjectId, + pub host_user_id: UserId, + pub unregistered: bool, +} + +#[derive(Clone, Debug, PartialEq, Serialize)] +pub struct UserActivitySummary { + pub id: UserId, + pub github_login: String, + pub project_activity: Vec<(ProjectId, Duration)>, +} + id_type!(OrgId); #[derive(FromRow)] pub struct Org { @@ -1125,6 +1351,94 @@ pub mod tests { assert_ne!(invite_code_4, invite_code_3); } + #[tokio::test(flavor = "multi_thread")] + async fn test_project_activity() { + let test_db = TestDb::postgres().await; + let db = test_db.db(); + + let user_1 = db.create_user("user_1", None, false).await.unwrap(); + let user_2 = db.create_user("user_2", None, false).await.unwrap(); + let user_3 = db.create_user("user_3", None, false).await.unwrap(); + let project_1 = db.register_project(user_1).await.unwrap(); + let project_2 = db.register_project(user_2).await.unwrap(); + let t0 = OffsetDateTime::now_utc() - Duration::from_secs(60 * 60); + + // User 2 opens a project + let t1 = t0 + Duration::from_secs(10); + db.record_project_activity(t0..t1, &[(user_2, project_2)]) + .await + .unwrap(); + + let t2 = t1 + Duration::from_secs(10); + db.record_project_activity(t1..t2, &[(user_2, project_2)]) + .await + .unwrap(); + + // User 1 joins the project + let t3 = t2 + Duration::from_secs(10); + db.record_project_activity(t2..t3, &[(user_2, project_2), (user_1, project_2)]) + .await + .unwrap(); + + // User 1 opens another project + let t4 = t3 + Duration::from_secs(10); + db.record_project_activity( + t3..t4, + &[ + (user_2, project_2), + (user_1, project_2), + (user_1, project_1), + ], + ) + .await + .unwrap(); + + // User 3 joins that project + let t5 = t4 + Duration::from_secs(10); + db.record_project_activity( + t4..t5, + &[ + (user_2, project_2), + (user_1, project_2), + (user_1, project_1), + (user_3, project_1), + ], + ) + .await + .unwrap(); + + // User 2 leaves + let t6 = t5 + Duration::from_secs(5); + db.record_project_activity(t5..t6, &[(user_1, project_1), (user_3, project_1)]) + .await + .unwrap(); + + let summary = db.summarize_project_activity(t0..t6, 10).await.unwrap(); + assert_eq!( + summary, + &[ + UserActivitySummary { + id: user_1, + github_login: "user_1".to_string(), + project_activity: vec![ + (project_2, Duration::from_secs(30)), + (project_1, Duration::from_secs(25)) + ] + }, + UserActivitySummary { + id: user_2, + github_login: "user_2".to_string(), + project_activity: vec![(project_2, Duration::from_secs(50))] + }, + UserActivitySummary { + id: user_3, + github_login: "user_3".to_string(), + project_activity: vec![(project_1, Duration::from_secs(15))] + }, + ] + ); + } + #[tokio::test(flavor = "multi_thread")] async fn test_recent_channel_messages() { for test_db in [ @@ -1696,6 +2010,8 @@ pub mod tests { pub struct FakeDb { background: Arc, pub users: Mutex>, + pub projects: Mutex>, + pub worktree_extensions: Mutex>, pub orgs: Mutex>, pub org_memberships: Mutex>, pub channels: Mutex>, @@ -1706,6 +2022,7 @@ pub mod tests { next_user_id: Mutex, next_org_id: Mutex, next_channel_id: Mutex, + next_project_id: Mutex, } #[derive(Debug)] @@ -1722,6 +2039,9 @@ pub mod tests { background, users: Default::default(), next_user_id: Mutex::new(1), + projects: Default::default(), + worktree_extensions: Default::default(), + next_project_id: Mutex::new(1), orgs: Default::default(), next_org_id: Mutex::new(1), org_memberships: Default::default(), @@ -1841,6 +2161,78 @@ pub mod tests { unimplemented!() } + // projects + + async fn register_project(&self, host_user_id: UserId) -> Result { + self.background.simulate_random_delay().await; + if !self.users.lock().contains_key(&host_user_id) { + Err(anyhow!("no such user"))?; + } + + let project_id = ProjectId(post_inc(&mut *self.next_project_id.lock())); + self.projects.lock().insert( + project_id, + Project { + id: project_id, + host_user_id, + unregistered: false, + }, + ); + Ok(project_id) + } + + async fn unregister_project(&self, project_id: ProjectId) -> Result<()> { + self.projects + .lock() + .get_mut(&project_id) + .ok_or_else(|| anyhow!("no such project"))? + .unregistered = true; + Ok(()) + } + + async fn update_worktree_extensions( + &self, + project_id: ProjectId, + worktree_id: u64, + extensions: HashMap, + ) -> Result<()> { + self.background.simulate_random_delay().await; + if !self.projects.lock().contains_key(&project_id) { + Err(anyhow!("no such project"))?; + } + + for (extension, count) in extensions { + self.worktree_extensions + .lock() + .insert((project_id, worktree_id, extension), count); + } + + Ok(()) + } + + async fn get_project_extensions( + &self, + _project_id: ProjectId, + ) -> Result>> { + unimplemented!() + } + + async fn record_project_activity( + &self, + _period: Range, + _active_projects: &[(UserId, ProjectId)], + ) -> Result<()> { + unimplemented!() + } + + async fn summarize_project_activity( + &self, + _period: Range, + _limit: usize, + ) -> Result> { + unimplemented!() + } + // contacts async fn get_contacts(&self, id: UserId) -> Result> { diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 9f20880447a34e5136725ecf07a72e754c4acdbc..47000d0bdb76561bbd1febd5f0c0e737e6366be1 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -1,5 +1,5 @@ use crate::{ - db::{tests::TestDb, UserId}, + db::{tests::TestDb, ProjectId, UserId}, rpc::{Executor, Server, Store}, AppState, }; @@ -1447,7 +1447,7 @@ async fn test_collaborating_with_diagnostics( deterministic.run_until_parked(); { let store = server.store.read().await; - let project = store.project(project_id).unwrap(); + let project = store.project(ProjectId::from_proto(project_id)).unwrap(); let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap(); assert!(!worktree.diagnostic_summaries.is_empty()); } @@ -4722,7 +4722,7 @@ impl TestServer { foreground: Rc, background: Arc, ) -> Self { - let test_db = TestDb::fake(background); + let test_db = TestDb::fake(background.clone()); let app_state = Self::build_app_state(&test_db).await; let peer = Peer::new(); let notifications = mpsc::unbounded(); diff --git a/crates/collab/src/main.rs b/crates/collab/src/main.rs index f8a9fedb66fd9487e8caed5cd3914689e8de6952..2c2c6a94f4dff18c6a6bd18fe501971d9831bb1f 100644 --- a/crates/collab/src/main.rs +++ b/crates/collab/src/main.rs @@ -14,6 +14,7 @@ use serde::Deserialize; use std::{ net::{SocketAddr, TcpListener}, sync::Arc, + time::Duration, }; use tracing_log::LogTracer; use tracing_subscriber::{filter::EnvFilter, fmt::format::JsonFields, Layer}; @@ -66,6 +67,8 @@ async fn main() -> Result<()> { .expect("failed to bind TCP listener"); let rpc_server = rpc::Server::new(state.clone(), None); + rpc_server.start_recording_project_activity(Duration::from_secs(5 * 60), rpc::RealExecutor); + let app = Router::::new() .merge(api::routes(&rpc_server, state.clone())) .merge(rpc::routes(rpc_server)); diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index e244757970c6973e08f893e549e5f1251159b644..7ce0fd840af969eecea5a7ec5c1b49ae9825a349 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -2,7 +2,7 @@ mod store; use crate::{ auth, - db::{self, ChannelId, MessageId, User, UserId}, + db::{self, ChannelId, MessageId, ProjectId, User, UserId}, AppState, Result, }; use anyhow::anyhow; @@ -288,6 +288,58 @@ impl Server { }) } + /// Start a long lived task that records which users are active in which projects. + pub fn start_recording_project_activity( + self: &Arc, + interval: Duration, + executor: E, + ) { + executor.spawn_detached({ + let this = Arc::downgrade(self); + let executor = executor.clone(); + async move { + let mut period_start = OffsetDateTime::now_utc(); + let mut active_projects = Vec::<(UserId, ProjectId)>::new(); + loop { + let sleep = executor.sleep(interval); + sleep.await; + let this = if let Some(this) = this.upgrade() { + this + } else { + break; + }; + + active_projects.clear(); + active_projects.extend(this.store().await.projects().flat_map( + |(project_id, project)| { + project.guests.values().chain([&project.host]).filter_map( + |collaborator| { + if !collaborator.admin + && collaborator + .last_activity + .map_or(false, |activity| activity > period_start) + { + Some((collaborator.user_id, *project_id)) + } else { + None + } + }, + ) + }, + )); + + let period_end = OffsetDateTime::now_utc(); + this.app_state + .db + .record_project_activity(period_start..period_end, &active_projects) + .await + .trace_err(); + period_start = period_end; + } + } + }); + } + pub fn handle_connection( self: &Arc, connection: Connection, @@ -401,14 +453,21 @@ impl Server { async fn sign_out(self: &mut Arc, connection_id: ConnectionId) -> Result<()> { self.peer.disconnect(connection_id); - let removed_user_id = { + let mut projects_to_unregister = Vec::new(); + let removed_user_id; + { let mut store = self.store_mut().await; let removed_connection = store.remove_connection(connection_id)?; for (project_id, project) in removed_connection.hosted_projects { + projects_to_unregister.push(project_id); broadcast(connection_id, project.guests.keys().copied(), |conn_id| { - self.peer - .send(conn_id, proto::UnregisterProject { project_id }) + self.peer.send( + conn_id, + proto::UnregisterProject { + project_id: project_id.to_proto(), + }, + ) }); for (_, receipts) in project.join_requests { @@ -433,7 +492,7 @@ impl Server { self.peer.send( conn_id, proto::RemoveProjectCollaborator { - project_id, + project_id: project_id.to_proto(), peer_id: connection_id.0, }, ) @@ -442,17 +501,27 @@ impl Server { self.peer .send( project.host_connection_id, - proto::ProjectUnshared { project_id }, + proto::ProjectUnshared { + project_id: project_id.to_proto(), + }, ) .trace_err(); } } } - removed_connection.user_id + removed_user_id = removed_connection.user_id; }; - self.update_user_contacts(removed_user_id).await?; + self.update_user_contacts(removed_user_id).await.trace_err(); + + for project_id in projects_to_unregister { + self.app_state + .db + .unregister_project(project_id) + .await + .trace_err(); + } Ok(()) } @@ -516,14 +585,18 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { - let project_id; - { - let mut state = self.store_mut().await; - let user_id = state.user_id_for_connection(request.sender_id)?; - project_id = state.register_project(request.sender_id, user_id); - }; + let user_id = self + .store() + .await + .user_id_for_connection(request.sender_id)?; + let project_id = self.app_state.db.register_project(user_id).await?; + self.store_mut() + .await + .register_project(request.sender_id, project_id)?; - response.send(proto::RegisterProjectResponse { project_id })?; + response.send(proto::RegisterProjectResponse { + project_id: project_id.to_proto(), + })?; Ok(()) } @@ -533,12 +606,13 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { + let project_id = ProjectId::from_proto(request.payload.project_id); let (user_id, project) = { let mut state = self.store_mut().await; - let project = - state.unregister_project(request.payload.project_id, request.sender_id)?; + let project = state.unregister_project(project_id, request.sender_id)?; (state.user_id_for_connection(request.sender_id)?, project) }; + self.app_state.db.unregister_project(project_id).await?; broadcast( request.sender_id, @@ -547,7 +621,7 @@ impl Server { self.peer.send( conn_id, proto::UnregisterProject { - project_id: request.payload.project_id, + project_id: project_id.to_proto(), }, ) }, @@ -613,7 +687,7 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { - let project_id = request.payload.project_id; + let project_id = ProjectId::from_proto(request.payload.project_id); let host_user_id; let guest_user_id; @@ -621,12 +695,12 @@ impl Server { { let state = self.store().await; let project = state.project(project_id)?; - host_user_id = project.host_user_id; + host_user_id = project.host.user_id; host_connection_id = project.host_connection_id; guest_user_id = state.user_id_for_connection(request.sender_id)?; }; - tracing::info!(project_id, %host_user_id, %host_connection_id, "join project"); + tracing::info!(%project_id, %host_user_id, %host_connection_id, "join project"); let has_contact = self .app_state .db @@ -644,7 +718,7 @@ impl Server { self.peer.send( host_connection_id, proto::RequestJoinProject { - project_id, + project_id: project_id.to_proto(), requester_id: guest_user_id.to_proto(), }, )?; @@ -659,13 +733,13 @@ impl Server { { let mut state = self.store_mut().await; - let project_id = request.payload.project_id; + let project_id = ProjectId::from_proto(request.payload.project_id); let project = state.project(project_id)?; if project.host_connection_id != request.sender_id { Err(anyhow!("no such connection"))?; } - host_user_id = project.host_user_id; + host_user_id = project.host.user_id; let guest_user_id = UserId::from_proto(request.payload.requester_id); if !request.payload.allow { @@ -697,7 +771,7 @@ impl Server { collaborators.push(proto::Collaborator { peer_id: project.host_connection_id.0, replica_id: 0, - user_id: project.host_user_id.to_proto(), + user_id: project.host.user_id.to_proto(), }); let worktrees = project .worktrees @@ -720,15 +794,15 @@ impl Server { .collect::>(); // Add all guests other than the requesting user's own connections as collaborators - for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests { + for (guest_conn_id, guest) in &project.guests { if receipts_with_replica_ids .iter() - .all(|(receipt, _)| receipt.sender_id != *peer_conn_id) + .all(|(receipt, _)| receipt.sender_id != *guest_conn_id) { collaborators.push(proto::Collaborator { - peer_id: peer_conn_id.0, - replica_id: *peer_replica_id as u32, - user_id: peer_user_id.to_proto(), + peer_id: guest_conn_id.0, + replica_id: guest.replica_id as u32, + user_id: guest.user_id.to_proto(), }); } } @@ -739,7 +813,7 @@ impl Server { self.peer.send( conn_id, proto::AddProjectCollaborator { - project_id, + project_id: project_id.to_proto(), collaborator: Some(proto::Collaborator { peer_id: receipt.sender_id.0, replica_id: *replica_id as u32, @@ -777,13 +851,13 @@ impl Server { request: TypedEnvelope, ) -> Result<()> { let sender_id = request.sender_id; - let project_id = request.payload.project_id; + let project_id = ProjectId::from_proto(request.payload.project_id); let project; { let mut store = self.store_mut().await; project = store.leave_project(sender_id, project_id)?; tracing::info!( - project_id, + %project_id, host_user_id = %project.host_user_id, host_connection_id = %project.host_connection_id, "leave project" @@ -794,7 +868,7 @@ impl Server { self.peer.send( conn_id, proto::RemoveProjectCollaborator { - project_id, + project_id: project_id.to_proto(), peer_id: sender_id.0, }, ) @@ -805,7 +879,7 @@ impl Server { self.peer.send( project.host_connection_id, proto::JoinProjectRequestCancelled { - project_id, + project_id: project_id.to_proto(), requester_id: requester_id.to_proto(), }, )?; @@ -814,7 +888,9 @@ impl Server { if project.unshare { self.peer.send( project.host_connection_id, - proto::ProjectUnshared { project_id }, + proto::ProjectUnshared { + project_id: project_id.to_proto(), + }, )?; } } @@ -826,18 +902,15 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { + let project_id = ProjectId::from_proto(request.payload.project_id); let user_id; { let mut state = self.store_mut().await; user_id = state.user_id_for_connection(request.sender_id)?; let guest_connection_ids = state - .read_project(request.payload.project_id, request.sender_id)? + .read_project(project_id, request.sender_id)? .guest_connection_ids(); - state.update_project( - request.payload.project_id, - &request.payload.worktrees, - request.sender_id, - )?; + state.update_project(project_id, &request.payload.worktrees, request.sender_id)?; broadcast(request.sender_id, guest_connection_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) @@ -851,9 +924,10 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { - self.store_mut() - .await - .register_project_activity(request.payload.project_id, request.sender_id)?; + self.store_mut().await.register_project_activity( + ProjectId::from_proto(request.payload.project_id), + request.sender_id, + )?; Ok(()) } @@ -862,28 +936,25 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { - let (connection_ids, metadata_changed) = { + let project_id = ProjectId::from_proto(request.payload.project_id); + let worktree_id = request.payload.worktree_id; + let (connection_ids, metadata_changed, extension_counts) = { let mut store = self.store_mut().await; let (connection_ids, metadata_changed, extension_counts) = store.update_worktree( request.sender_id, - request.payload.project_id, - request.payload.worktree_id, + project_id, + worktree_id, &request.payload.root_name, &request.payload.removed_entries, &request.payload.updated_entries, request.payload.scan_id, )?; - for (extension, count) in extension_counts { - tracing::info!( - project_id = request.payload.project_id, - worktree_id = request.payload.worktree_id, - ?extension, - %count, - "worktree updated" - ); - } - (connection_ids, metadata_changed) + (connection_ids, metadata_changed, extension_counts.clone()) }; + self.app_state + .db + .update_worktree_extensions(project_id, worktree_id, extension_counts) + .await?; broadcast(request.sender_id, connection_ids, |connection_id| { self.peer @@ -910,7 +981,7 @@ impl Server { .clone() .ok_or_else(|| anyhow!("invalid summary"))?; let receiver_ids = self.store_mut().await.update_diagnostic_summary( - request.payload.project_id, + ProjectId::from_proto(request.payload.project_id), request.payload.worktree_id, request.sender_id, summary, @@ -928,7 +999,7 @@ impl Server { request: TypedEnvelope, ) -> Result<()> { let receiver_ids = self.store_mut().await.start_language_server( - request.payload.project_id, + ProjectId::from_proto(request.payload.project_id), request.sender_id, request .payload @@ -947,10 +1018,10 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { - let receiver_ids = self - .store() - .await - .project_connection_ids(request.payload.project_id, request.sender_id)?; + let receiver_ids = self.store().await.project_connection_ids( + ProjectId::from_proto(request.payload.project_id), + request.sender_id, + )?; broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) @@ -969,7 +1040,10 @@ impl Server { let host_connection_id = self .store() .await - .read_project(request.payload.remote_entity_id(), request.sender_id)? + .read_project( + ProjectId::from_proto(request.payload.remote_entity_id()), + request.sender_id, + )? .host_connection_id; response.send( @@ -985,10 +1059,11 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { + let project_id = ProjectId::from_proto(request.payload.project_id); let host = self .store() .await - .read_project(request.payload.project_id, request.sender_id)? + .read_project(project_id, request.sender_id)? .host_connection_id; let response_payload = self .peer @@ -998,7 +1073,7 @@ impl Server { let mut guests = self .store() .await - .read_project(request.payload.project_id, request.sender_id)? + .read_project(project_id, request.sender_id)? .connection_ids(); guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id); broadcast(host, guests, |conn_id| { @@ -1014,10 +1089,11 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { + let project_id = ProjectId::from_proto(request.payload.project_id); let receiver_ids = { let mut store = self.store_mut().await; - store.register_project_activity(request.payload.project_id, request.sender_id)?; - store.project_connection_ids(request.payload.project_id, request.sender_id)? + store.register_project_activity(project_id, request.sender_id)?; + store.project_connection_ids(project_id, request.sender_id)? }; broadcast(request.sender_id, receiver_ids, |connection_id| { @@ -1032,10 +1108,10 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { - let receiver_ids = self - .store() - .await - .project_connection_ids(request.payload.project_id, request.sender_id)?; + let receiver_ids = self.store().await.project_connection_ids( + ProjectId::from_proto(request.payload.project_id), + request.sender_id, + )?; broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) @@ -1047,10 +1123,10 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { - let receiver_ids = self - .store() - .await - .project_connection_ids(request.payload.project_id, request.sender_id)?; + let receiver_ids = self.store().await.project_connection_ids( + ProjectId::from_proto(request.payload.project_id), + request.sender_id, + )?; broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) @@ -1062,10 +1138,10 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { - let receiver_ids = self - .store() - .await - .project_connection_ids(request.payload.project_id, request.sender_id)?; + let receiver_ids = self.store().await.project_connection_ids( + ProjectId::from_proto(request.payload.project_id), + request.sender_id, + )?; broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) @@ -1078,18 +1154,19 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { + let project_id = ProjectId::from_proto(request.payload.project_id); let leader_id = ConnectionId(request.payload.leader_id); let follower_id = request.sender_id; { let mut store = self.store_mut().await; if !store - .project_connection_ids(request.payload.project_id, follower_id)? + .project_connection_ids(project_id, follower_id)? .contains(&leader_id) { Err(anyhow!("no such peer"))?; } - store.register_project_activity(request.payload.project_id, follower_id)?; + store.register_project_activity(project_id, follower_id)?; } let mut response_payload = self @@ -1104,15 +1181,16 @@ impl Server { } async fn unfollow(self: Arc, request: TypedEnvelope) -> Result<()> { + let project_id = ProjectId::from_proto(request.payload.project_id); let leader_id = ConnectionId(request.payload.leader_id); let mut store = self.store_mut().await; if !store - .project_connection_ids(request.payload.project_id, request.sender_id)? + .project_connection_ids(project_id, request.sender_id)? .contains(&leader_id) { Err(anyhow!("no such peer"))?; } - store.register_project_activity(request.payload.project_id, request.sender_id)?; + store.register_project_activity(project_id, request.sender_id)?; self.peer .forward_send(request.sender_id, leader_id, request.payload)?; Ok(()) @@ -1122,10 +1200,10 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { + let project_id = ProjectId::from_proto(request.payload.project_id); let mut store = self.store_mut().await; - store.register_project_activity(request.payload.project_id, request.sender_id)?; - let connection_ids = - store.project_connection_ids(request.payload.project_id, request.sender_id)?; + store.register_project_activity(project_id, request.sender_id)?; + let connection_ids = store.project_connection_ids(project_id, request.sender_id)?; let leader_id = request .payload .variant diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 9fe9162edd67c405be4b105ff4cd5cf3d16a8fe7..561b7c6ab4c2745f47fe5c8e0b17c3111a6545c8 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -1,49 +1,58 @@ -use crate::db::{self, ChannelId, UserId}; +use crate::db::{self, ChannelId, ProjectId, UserId}; use anyhow::{anyhow, Result}; -use collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}; +use collections::{ + btree_map, + hash_map::{self, Entry}, + BTreeMap, BTreeSet, HashMap, HashSet, +}; use rpc::{proto, ConnectionId, Receipt}; use serde::Serialize; use std::{ - collections::hash_map, - ffi::{OsStr, OsString}, mem, path::{Path, PathBuf}, str, - time::{Duration, Instant}, + time::Duration, }; +use time::OffsetDateTime; use tracing::instrument; #[derive(Default, Serialize)] pub struct Store { connections: HashMap, connections_by_user_id: HashMap>, - projects: HashMap, + projects: BTreeMap, #[serde(skip)] channels: HashMap, - next_project_id: u64, } #[derive(Serialize)] struct ConnectionState { user_id: UserId, admin: bool, - projects: HashSet, - requested_projects: HashSet, + projects: BTreeSet, + requested_projects: HashSet, channels: HashSet, } #[derive(Serialize)] pub struct Project { pub host_connection_id: ConnectionId, - pub host_user_id: UserId, - pub guests: HashMap, + pub host: Collaborator, + pub guests: HashMap, #[serde(skip)] pub join_requests: HashMap>>, pub active_replica_ids: HashSet, pub worktrees: BTreeMap, pub language_servers: Vec, +} + +#[derive(Serialize)] +pub struct Collaborator { + pub replica_id: ReplicaId, + pub user_id: UserId, #[serde(skip)] - last_activity: Option, + pub last_activity: Option, + pub admin: bool, } #[derive(Default, Serialize)] @@ -53,7 +62,7 @@ pub struct Worktree { #[serde(skip)] pub entries: HashMap, #[serde(skip)] - pub extension_counts: HashMap, + pub extension_counts: HashMap, #[serde(skip)] pub diagnostic_summaries: BTreeMap, pub scan_id: u64, @@ -69,8 +78,8 @@ pub type ReplicaId = u16; #[derive(Default)] pub struct RemovedConnectionState { pub user_id: UserId, - pub hosted_projects: HashMap, - pub guest_project_ids: HashSet, + pub hosted_projects: HashMap, + pub guest_project_ids: HashSet, pub contact_ids: HashSet, } @@ -93,6 +102,9 @@ pub struct Metrics { impl Store { pub fn metrics(&self) -> Metrics { + const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60); + let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT; + let connections = self.connections.values().filter(|c| !c.admin).count(); let mut registered_projects = 0; let mut active_projects = 0; @@ -101,7 +113,7 @@ impl Store { if let Some(connection) = self.connections.get(&project.host_connection_id) { if !connection.admin { registered_projects += 1; - if project.is_active() { + if project.is_active_since(active_window_start) { active_projects += 1; if !project.guests.is_empty() { shared_projects += 1; @@ -289,9 +301,9 @@ impl Store { let mut metadata = Vec::new(); for project_id in project_ids { if let Some(project) = self.projects.get(&project_id) { - if project.host_user_id == user_id { + if project.host.user_id == user_id { metadata.push(proto::ProjectMetadata { - id: project_id, + id: project_id.to_proto(), visible_worktree_root_names: project .worktrees .values() @@ -301,7 +313,7 @@ impl Store { guests: project .guests .values() - .map(|(_, user_id)| user_id.to_proto()) + .map(|guest| guest.user_id.to_proto()) .collect(), }); } @@ -314,32 +326,36 @@ impl Store { pub fn register_project( &mut self, host_connection_id: ConnectionId, - host_user_id: UserId, - ) -> u64 { - let project_id = self.next_project_id; + project_id: ProjectId, + ) -> Result<()> { + let connection = self + .connections + .get_mut(&host_connection_id) + .ok_or_else(|| anyhow!("no such connection"))?; + connection.projects.insert(project_id); self.projects.insert( project_id, Project { host_connection_id, - host_user_id, + host: Collaborator { + user_id: connection.user_id, + replica_id: 0, + last_activity: None, + admin: connection.admin, + }, guests: Default::default(), join_requests: Default::default(), active_replica_ids: Default::default(), worktrees: Default::default(), language_servers: Default::default(), - last_activity: None, }, ); - if let Some(connection) = self.connections.get_mut(&host_connection_id) { - connection.projects.insert(project_id); - } - self.next_project_id += 1; - project_id + Ok(()) } pub fn update_project( &mut self, - project_id: u64, + project_id: ProjectId, worktrees: &[proto::WorktreeMetadata], connection_id: ConnectionId, ) -> Result<()> { @@ -371,11 +387,11 @@ impl Store { pub fn unregister_project( &mut self, - project_id: u64, + project_id: ProjectId, connection_id: ConnectionId, ) -> Result { match self.projects.entry(project_id) { - hash_map::Entry::Occupied(e) => { + btree_map::Entry::Occupied(e) => { if e.get().host_connection_id == connection_id { let project = e.remove(); @@ -408,13 +424,13 @@ impl Store { Err(anyhow!("no such project"))? } } - hash_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?, + btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?, } } pub fn update_diagnostic_summary( &mut self, - project_id: u64, + project_id: ProjectId, worktree_id: u64, connection_id: ConnectionId, summary: proto::DiagnosticSummary, @@ -439,7 +455,7 @@ impl Store { pub fn start_language_server( &mut self, - project_id: u64, + project_id: ProjectId, connection_id: ConnectionId, language_server: proto::LanguageServer, ) -> Result> { @@ -458,7 +474,7 @@ impl Store { pub fn request_join_project( &mut self, requester_id: UserId, - project_id: u64, + project_id: ProjectId, receipt: Receipt, ) -> Result<()> { let connection = self @@ -470,7 +486,6 @@ impl Store { .get_mut(&project_id) .ok_or_else(|| anyhow!("no such project"))?; connection.requested_projects.insert(project_id); - project.last_activity = Some(Instant::now()); project .join_requests .entry(requester_id) @@ -483,7 +498,7 @@ impl Store { &mut self, responder_connection_id: ConnectionId, requester_id: UserId, - project_id: u64, + project_id: ProjectId, ) -> Option>> { let project = self.projects.get_mut(&project_id)?; if responder_connection_id != project.host_connection_id { @@ -495,7 +510,7 @@ impl Store { let requester_connection = self.connections.get_mut(&receipt.sender_id)?; requester_connection.requested_projects.remove(&project_id); } - project.last_activity = Some(Instant::now()); + project.host.last_activity = Some(OffsetDateTime::now_utc()); Some(receipts) } @@ -504,7 +519,7 @@ impl Store { &mut self, responder_connection_id: ConnectionId, requester_id: UserId, - project_id: u64, + project_id: ProjectId, ) -> Option<(Vec<(Receipt, ReplicaId)>, &Project)> { let project = self.projects.get_mut(&project_id)?; if responder_connection_id != project.host_connection_id { @@ -522,20 +537,26 @@ impl Store { replica_id += 1; } project.active_replica_ids.insert(replica_id); - project - .guests - .insert(receipt.sender_id, (replica_id, requester_id)); + project.guests.insert( + receipt.sender_id, + Collaborator { + replica_id, + user_id: requester_id, + last_activity: Some(OffsetDateTime::now_utc()), + admin: requester_connection.admin, + }, + ); receipts_with_replica_ids.push((receipt, replica_id)); } - project.last_activity = Some(Instant::now()); + project.host.last_activity = Some(OffsetDateTime::now_utc()); Some((receipts_with_replica_ids, project)) } pub fn leave_project( &mut self, connection_id: ConnectionId, - project_id: u64, + project_id: ProjectId, ) -> Result { let user_id = self.user_id_for_connection(connection_id)?; let project = self @@ -544,13 +565,12 @@ impl Store { .ok_or_else(|| anyhow!("no such project"))?; // If the connection leaving the project is a collaborator, remove it. - let remove_collaborator = - if let Some((replica_id, _)) = project.guests.remove(&connection_id) { - project.active_replica_ids.remove(&replica_id); - true - } else { - false - }; + let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) { + project.active_replica_ids.remove(&guest.replica_id); + true + } else { + false + }; // If the connection leaving the project has a pending request, remove it. // If that user has no other pending requests on other connections, indicate that the request should be cancelled. @@ -579,11 +599,9 @@ impl Store { } } - project.last_activity = Some(Instant::now()); - Ok(LeftProject { host_connection_id: project.host_connection_id, - host_user_id: project.host_user_id, + host_user_id: project.host.user_id, connection_ids, cancel_request, unshare, @@ -594,13 +612,13 @@ impl Store { pub fn update_worktree( &mut self, connection_id: ConnectionId, - project_id: u64, + project_id: ProjectId, worktree_id: u64, worktree_root_name: &str, removed_entries: &[u64], updated_entries: &[proto::Entry], scan_id: u64, - ) -> Result<(Vec, bool, &HashMap)> { + ) -> Result<(Vec, bool, HashMap)> { let project = self.write_project(project_id, connection_id)?; let connection_ids = project.connection_ids(); let mut worktree = project.worktrees.entry(worktree_id).or_default(); @@ -642,12 +660,16 @@ impl Store { } worktree.scan_id = scan_id; - Ok((connection_ids, metadata_changed, &worktree.extension_counts)) + Ok(( + connection_ids, + metadata_changed, + worktree.extension_counts.clone(), + )) } pub fn project_connection_ids( &self, - project_id: u64, + project_id: ProjectId, acting_connection_id: ConnectionId, ) -> Result> { Ok(self @@ -663,7 +685,7 @@ impl Store { .connection_ids()) } - pub fn project(&self, project_id: u64) -> Result<&Project> { + pub fn project(&self, project_id: ProjectId) -> Result<&Project> { self.projects .get(&project_id) .ok_or_else(|| anyhow!("no such project")) @@ -671,14 +693,33 @@ impl Store { pub fn register_project_activity( &mut self, - project_id: u64, + project_id: ProjectId, connection_id: ConnectionId, ) -> Result<()> { - self.write_project(project_id, connection_id)?.last_activity = Some(Instant::now()); + let project = self + .projects + .get_mut(&project_id) + .ok_or_else(|| anyhow!("no such project"))?; + let collaborator = if connection_id == project.host_connection_id { + &mut project.host + } else if let Some(guest) = project.guests.get_mut(&connection_id) { + guest + } else { + return Err(anyhow!("no such project"))?; + }; + collaborator.last_activity = Some(OffsetDateTime::now_utc()); Ok(()) } - pub fn read_project(&self, project_id: u64, connection_id: ConnectionId) -> Result<&Project> { + pub fn projects(&self) -> impl Iterator { + self.projects.iter() + } + + pub fn read_project( + &self, + project_id: ProjectId, + connection_id: ConnectionId, + ) -> Result<&Project> { let project = self .projects .get(&project_id) @@ -694,7 +735,7 @@ impl Store { fn write_project( &mut self, - project_id: u64, + project_id: ProjectId, connection_id: ConnectionId, ) -> Result<&mut Project> { let project = self @@ -768,7 +809,7 @@ impl Store { project .guests .values() - .map(|(replica_id, _)| *replica_id) + .map(|guest| guest.replica_id) .collect::>(), ); } @@ -783,11 +824,15 @@ impl Store { } impl Project { - fn is_active(&self) -> bool { - const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60); - self.last_activity.map_or(false, |last_activity| { - last_activity.elapsed() < ACTIVE_PROJECT_TIMEOUT - }) + fn is_active_since(&self, start_time: OffsetDateTime) -> bool { + self.guests + .values() + .chain([&self.host]) + .any(|collaborator| { + collaborator + .last_activity + .map_or(false, |active_time| active_time > start_time) + }) } pub fn guest_connection_ids(&self) -> Vec { @@ -809,9 +854,10 @@ impl Channel { } } -fn extension_for_entry(entry: &proto::Entry) -> Option<&OsStr> { +fn extension_for_entry(entry: &proto::Entry) -> Option<&str> { str::from_utf8(&entry.path) .ok() .map(Path::new) .and_then(|p| p.extension()) + .and_then(|e| e.to_str()) } diff --git a/crates/gpui/Cargo.toml b/crates/gpui/Cargo.toml index 6caddfb26bdd974595447fc215cc1d28735c6cd4..72c65a4c337ff11fb828f324b5cef5995a6f5e72 100644 --- a/crates/gpui/Cargo.toml +++ b/crates/gpui/Cargo.toml @@ -40,7 +40,7 @@ serde = { version = "1.0", features = ["derive", "rc"] } serde_json = "1.0" smallvec = { version = "1.6", features = ["union"] } smol = "1.2" -time = { version = "0.3" } +time = { version = "0.3", features = ["serde", "serde-well-known"] } tiny-skia = "0.5" tree-sitter = "0.20" usvg = "0.14"