diff --git a/Cargo.lock b/Cargo.lock index fb951e97f6eb079eb64c9035efbed20b66404326..ecfea4817a1936b6fa12b59956b971c2e255ca88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,21 +97,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "alloc-no-stdlib" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5192ec435945d87bc2f70992b4d818154b5feede43c09fb7592146374eac90a6" - -[[package]] -name = "alloc-stdlib" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697ed7edc0f1711de49ce108c541623a0af97c6c60b2f6e2b65229847ac843c2" -dependencies = [ - "alloc-no-stdlib", -] - [[package]] name = "ansi_term" version = "0.11.0" @@ -160,16 +145,6 @@ dependencies = [ "rust-embed", ] -[[package]] -name = "async-attributes" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" -dependencies = [ - "quote", - "syn", -] - [[package]] name = "async-broadcast" version = "0.3.4" @@ -198,22 +173,11 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443ccbb270374a2b1055fc72da40e1f237809cd6bb0e97e66d264cd138473a6" dependencies = [ - "brotli", "flate2", "futures-core", "futures-io", "memchr", - "pin-project-lite 0.2.4", -] - -[[package]] -name = "async-dup" -version = "1.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7427a12b8dc09291528cfb1da2447059adb4a257388c2acd6497a79d55cf6f7c" -dependencies = [ - "futures-io", - "simple-mutex", + "pin-project-lite 0.2.8", ] [[package]] @@ -257,24 +221,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "async-h1" -version = "2.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc5142de15b549749cce62923a50714b0d7b77f5090ced141599e78899865451" -dependencies = [ - "async-channel", - "async-dup", - "async-std", - "byte-pool", - "futures-core", - "http-types", - "httparse", - "lazy_static", - "log", - "pin-project", -] - [[package]] name = "async-io" version = "1.3.1" @@ -372,48 +318,12 @@ dependencies = [ "webpki", ] -[[package]] -name = "async-session" -version = "2.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "345022a2eed092cd105cc1b26fd61c341e100bd5fcbbd792df4baf31c2cc631f" -dependencies = [ - "anyhow", - "async-std", - "async-trait", - "base64 0.12.3", - "bincode", - "blake3", - "chrono", - "hmac 0.8.1", - "kv-log-macro", - "rand 0.7.3", - "serde", - "serde_json", - "sha2 0.9.5", -] - -[[package]] -name = "async-sse" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53bba003996b8fd22245cd0c59b869ba764188ed435392cf2796d03b805ade10" -dependencies = [ - "async-channel", - "async-std", - "http-types", - "log", - "memchr", - "pin-project-lite 0.1.12", -] - [[package]] name = "async-std" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9f06685bad74e0570f5213741bea82158279a4103d988e57bfada11ad230341" dependencies = [ - "async-attributes", "async-channel", "async-global-executor", "async-io", @@ -430,7 +340,7 @@ dependencies = [ "memchr", "num_cpus", "once_cell", - "pin-project-lite 0.2.4", + "pin-project-lite 0.2.8", "pin-utils", "slab", "wasm-bindgen-futures", @@ -475,8 +385,8 @@ dependencies = [ "futures-io", "futures-util", "log", - "pin-project-lite 0.2.4", - "tungstenite", + "pin-project-lite 0.2.8", + "tungstenite 0.16.0", ] [[package]] @@ -545,6 +455,55 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "axum" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f523b4e98ba6897ae90994bc18423d9877c54f9047b06a00ddc8122a957b1c70" +dependencies = [ + "async-trait", + "axum-core", + "base64 0.13.0", + "bitflags", + "bytes 1.0.1", + "futures-util", + "headers", + "http", + "http-body", + "hyper", + "itoa 1.0.1", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite 0.2.8", + "serde", + "serde_json", + "serde_urlencoded", + "sha-1 0.10.0", + "sync_wrapper", + "tokio", + "tokio-tungstenite", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3ddbd16eabff8b45f21b98671fddcc93daaa7ac4c84f8473693437226040de5" +dependencies = [ + "async-trait", + "bytes 1.0.1", + "futures-util", + "http", + "http-body", + "mime", +] + [[package]] name = "backtrace" version = "0.3.64" @@ -618,9 +577,9 @@ dependencies = [ [[package]] name = "bitflags" -version = "1.2.1" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitvec" @@ -634,21 +593,6 @@ dependencies = [ "wyz", ] -[[package]] -name = "blake3" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b64485778c4f16a6a5a9d335e80d449ac6c70cdd6a06d2af18a6f6f775a125b3" -dependencies = [ - "arrayref", - "arrayvec 0.5.2", - "cc", - "cfg-if 0.1.10", - "constant_time_eq", - "crypto-mac 0.8.0", - "digest 0.9.0", -] - [[package]] name = "block" version = "0.1.6" @@ -702,27 +646,6 @@ dependencies = [ "workspace", ] -[[package]] -name = "brotli" -version = "3.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f29919120f08613aadcd4383764e00526fc9f18b6c0895814faeed0dd78613e" -dependencies = [ - "alloc-no-stdlib", - "alloc-stdlib", - "brotli-decompressor", -] - -[[package]] -name = "brotli-decompressor" -version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1052e1c3b8d4d80eb84a8b94f0a1498797b5fb96314c001156a1c761940ef4ec" -dependencies = [ - "alloc-no-stdlib", - "alloc-stdlib", -] - [[package]] name = "bstr" version = "0.2.15" @@ -744,16 +667,6 @@ version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631" -[[package]] -name = "byte-pool" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8c7230ddbb427b1094d477d821a99f3f54d36333178eeb806e279bcdcecf0ca" -dependencies = [ - "crossbeam-queue", - "stable_deref_trait", -] - [[package]] name = "bytemuck" version = "1.5.1" @@ -838,7 +751,6 @@ dependencies = [ "libc", "num-integer", "num-traits", - "serde", "time 0.1.44", "winapi 0.3.9", ] @@ -1012,10 +924,9 @@ name = "collab" version = "0.1.0" dependencies = [ "anyhow", - "async-io", - "async-std", "async-trait", "async-tungstenite", + "axum", "base64 0.13.0", "client", "collections", @@ -1039,14 +950,14 @@ dependencies = [ "serde", "serde_json", "settings", - "sha-1", + "sha-1 0.9.6", "sqlx", - "surf", "theme", - "tide", - "tide-compress", "time 0.2.27", + "tokio", + "tokio-tungstenite", "toml", + "tower", "util", "workspace", ] @@ -1096,12 +1007,6 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f92cfa0fd5690b3cf8c1ef2cabbd9b7ef22fa53cf5e1f92b05103f6d5d1cf6e7" -[[package]] -name = "constant_time_eq" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" - [[package]] name = "contacts_panel" version = "0.1.0" @@ -1319,16 +1224,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "crypto-mac" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b584a330336237c1eecd3e94266efb216c56ed91225d634cb2991c5f3fd1aeab" -dependencies = [ - "generic-array", - "subtle", -] - [[package]] name = "crypto-mac" version = "0.10.0" @@ -1713,22 +1608,6 @@ dependencies = [ "instant", ] -[[package]] -name = "femme" -version = "2.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2af1a24f391a5a94d756db5092c6576aad494b88a71a5a36b20c67b63e0df034" -dependencies = [ - "cfg-if 0.1.10", - "js-sys", - "log", - "serde", - "serde_derive", - "serde_json", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "file_finder" version = "0.1.0" @@ -1940,9 +1819,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.12" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2d31b7ec7efab6eefc7c57233bb10b847986139d88cc2f5a02a1ae6871a1846" +checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" dependencies = [ "futures-core", "futures-sink", @@ -1967,9 +1846,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.12" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28be053525281ad8259d47e4de5de657b25e7bac113458555bb4b70bc6870500" +checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" [[package]] name = "futures-lite" @@ -1982,17 +1861,16 @@ dependencies = [ "futures-io", "memchr", "parking", - "pin-project-lite 0.2.4", + "pin-project-lite 0.2.8", "waker-fn", ] [[package]] name = "futures-macro" -version = "0.3.12" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c287d25add322d9f9abdcdc5927ca398917996600182178774032e9f8258fedd" +checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" dependencies = [ - "proc-macro-hack", "proc-macro2", "quote", "syn", @@ -2000,21 +1878,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.14" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c5629433c555de3d82861a7a4e3794a4c40040390907cfbfd7143a92a426c23" +checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" [[package]] name = "futures-task" -version = "0.3.14" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba7aa51095076f3ba6d9a1f702f74bd05ec65f555d70d2033d55ba8d69f581bc" +checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" [[package]] name = "futures-util" -version = "0.3.12" +version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "632a8cd0f2a4b3fdea1657f08bde063848c3bd00f9bbf6e256b8be78802e624b" +checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ "futures-channel", "futures-core", @@ -2023,10 +1901,8 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project-lite 0.2.4", + "pin-project-lite 0.2.8", "pin-utils", - "proc-macro-hack", - "proc-macro-nested", "slab", ] @@ -2239,6 +2115,31 @@ dependencies = [ "hashbrown 0.11.2", ] +[[package]] +name = "headers" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cff78e5788be1e0ab65b04d306b2ed5092c815ec97ec70f4ebd5aee158aa55d" +dependencies = [ + "base64 0.13.0", + "bitflags", + "bytes 1.0.1", + "headers-core", + "http", + "httpdate", + "mime", + "sha-1 0.10.0", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http", +] + [[package]] name = "heck" version = "0.3.3" @@ -2279,16 +2180,6 @@ dependencies = [ "hmac 0.10.1", ] -[[package]] -name = "hmac" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "126888268dcc288495a26bf004b38c5fdbb31682f992c84ceb046a1f0fe38840" -dependencies = [ - "crypto-mac 0.8.0", - "digest 0.9.0", -] - [[package]] name = "hmac" version = "0.10.1" @@ -2311,13 +2202,13 @@ dependencies = [ [[package]] name = "http" -version = "0.2.4" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11" +checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" dependencies = [ "bytes 1.0.1", "fnv", - "itoa 0.4.7", + "itoa 1.0.1", ] [[package]] @@ -2329,6 +2220,17 @@ dependencies = [ "base64 0.12.3", ] +[[package]] +name = "http-body" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +dependencies = [ + "bytes 1.0.1", + "http", + "pin-project-lite 0.2.8", +] + [[package]] name = "http-client" version = "6.4.1" @@ -2344,6 +2246,12 @@ dependencies = [ "log", ] +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + [[package]] name = "http-types" version = "2.11.1" @@ -2357,7 +2265,7 @@ dependencies = [ "cookie", "futures-lite", "infer", - "pin-project-lite 0.2.4", + "pin-project-lite 0.2.8", "rand 0.7.3", "serde", "serde_json", @@ -2368,9 +2276,15 @@ dependencies = [ [[package]] name = "httparse" -version = "1.4.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3a87b616e37e93c22fb19bcd386f02f3af5ea98a25670ad0fce773de23c5e68" +checksum = "6330e8a36bd8c859f3fa6d9382911fbb7147ec39807f63b923933a247240b9ba" + +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" [[package]] name = "humantime" @@ -2378,6 +2292,29 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "hyper" +version = "0.14.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2" +dependencies = [ + "bytes 1.0.1", + "futures-channel", + "futures-core", + "futures-util", + "http", + "http-body", + "httparse", + "httpdate", + "itoa 1.0.1", + "pin-project-lite 0.2.8", + "socket2 0.4.0", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "idna" version = "0.2.3" @@ -2480,7 +2417,7 @@ dependencies = [ "fnv", "lazy_static", "libc", - "mio", + "mio 0.6.23", "rand 0.7.3", "serde", "tempfile", @@ -2844,6 +2781,12 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" +[[package]] +name = "matchit" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb" + [[package]] name = "maybe-uninit" version = "2.0.0" @@ -2956,12 +2899,25 @@ dependencies = [ "kernel32-sys", "libc", "log", - "miow", + "miow 0.2.2", "net2", "slab", "winapi 0.2.8", ] +[[package]] +name = "mio" +version = "0.7.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc" +dependencies = [ + "libc", + "log", + "miow 0.3.7", + "ntapi", + "winapi 0.3.9", +] + [[package]] name = "miow" version = "0.2.2" @@ -2974,6 +2930,15 @@ dependencies = [ "ws2_32-sys", ] +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "multimap" version = "0.8.3" @@ -3024,6 +2989,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "ntapi" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "num-bigint" version = "0.4.0" @@ -3369,9 +3343,9 @@ checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777" [[package]] name = "pin-project-lite" -version = "0.2.4" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "439697af366c49a6d0a010c56a0d97685bc140ce0d377b13a2ea2aa42d64a827" +checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c" [[package]] name = "pin-utils" @@ -3493,12 +3467,6 @@ version = "0.5.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" -[[package]] -name = "proc-macro-nested" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" - [[package]] name = "proc-macro2" version = "1.0.36" @@ -3883,12 +3851,6 @@ dependencies = [ "winapi 0.3.9", ] -[[package]] -name = "route-recognizer" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56770675ebc04927ded3e60633437841581c285dc6236109ea25fbf3beb7b59e" - [[package]] name = "roxmltree" version = "0.14.1" @@ -4334,6 +4296,17 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "sha-1" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures 0.2.1", + "digest 0.10.3", +] + [[package]] name = "sha1" version = "0.6.0" @@ -4395,15 +4368,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ad1d488a557b235fc46dae55512ffbfc429d2482b08b4d9435ab07384ca8aec" -[[package]] -name = "simple-mutex" -version = "1.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38aabbeafa6f6dead8cebf246fe9fae1f9215c8d29b3a69f93bd62a9e4a3dcd6" -dependencies = [ - "event-listener", -] - [[package]] name = "simple_asn1" version = "0.5.3" @@ -4596,7 +4560,7 @@ dependencies = [ "rustls", "serde", "serde_json", - "sha-1", + "sha-1 0.9.6", "sha2 0.9.5", "smallvec", "sqlformat", @@ -4641,12 +4605,6 @@ dependencies = [ "async-std", ] -[[package]] -name = "stable_deref_trait" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" - [[package]] name = "standback" version = "0.2.17" @@ -4766,7 +4724,7 @@ dependencies = [ "log", "mime_guess", "once_cell", - "pin-project-lite 0.2.4", + "pin-project-lite 0.2.8", "serde", "serde_json", "web-sys", @@ -4818,6 +4776,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "sync_wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" + [[package]] name = "synstructure" version = "0.12.4" @@ -4971,41 +4935,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "tide" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c459573f0dd2cc734b539047f57489ea875af8ee950860ded20cf93a79a1dee0" -dependencies = [ - "async-h1", - "async-session", - "async-sse", - "async-std", - "async-trait", - "femme", - "futures-util", - "http-client", - "http-types", - "kv-log-macro", - "log", - "pin-project-lite 0.2.4", - "route-recognizer", - "serde", - "serde_json", -] - -[[package]] -name = "tide-compress" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d59e3885ecbc547a611d81e501b51bb5f52abd44c3eb3b733ac3c44ff2f2619" -dependencies = [ - "async-compression", - "futures-lite", - "http-types", - "tide", -] - [[package]] name = "tiff" version = "0.6.1" @@ -5119,6 +5048,61 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" +[[package]] +name = "tokio" +version = "1.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c27a64b625de6d309e8c57716ba93021dccf1b3b5c97edd6d3dd2d2135afc0a" +dependencies = [ + "bytes 1.0.1", + "libc", + "memchr", + "mio 0.7.14", + "num_cpus", + "once_cell", + "parking_lot", + "pin-project-lite 0.2.8", + "signal-hook-registry", + "tokio-macros", + "winapi 0.3.9", +] + +[[package]] +name = "tokio-macros" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06cda1232a49558c46f8a504d5b93101d42c0bf7f911f12a105ba48168f821ae" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.17.2", +] + +[[package]] +name = "tokio-util" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0edfdeb067411dba2044da6d1cb2df793dd35add7888d73c16e3381ded401764" +dependencies = [ + "bytes 1.0.1", + "futures-core", + "futures-sink", + "pin-project-lite 0.2.8", + "tokio", +] + [[package]] name = "toml" version = "0.5.8" @@ -5128,6 +5112,54 @@ dependencies = [ "serde", ] +[[package]] +name = "tower" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite 0.2.8", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aba3f3efabf7fb41fae8534fc20a817013dd1c12cb45441efb6c82e6556b4cd8" +dependencies = [ + "bitflags", + "bytes 1.0.1", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite 0.2.8", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" + +[[package]] +name = "tower-service" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" + [[package]] name = "tracing" version = "0.1.26" @@ -5136,7 +5168,7 @@ checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" dependencies = [ "cfg-if 1.0.0", "log", - "pin-project-lite 0.2.4", + "pin-project-lite 0.2.8", "tracing-attributes", "tracing-core", ] @@ -5248,6 +5280,12 @@ dependencies = [ "tree-sitter", ] +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + [[package]] name = "ttf-parser" version = "0.9.0" @@ -5273,7 +5311,26 @@ dependencies = [ "httparse", "log", "rand 0.8.3", - "sha-1", + "sha-1 0.9.6", + "thiserror", + "url", + "utf-8", +] + +[[package]] +name = "tungstenite" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96a2dea40e7570482f28eb57afbe42d97551905da6a9400acc5c328d24004f5" +dependencies = [ + "base64 0.13.0", + "byteorder", + "bytes 1.0.1", + "http", + "httparse", + "log", + "rand 0.8.3", + "sha-1 0.10.0", "thiserror", "url", "utf-8", @@ -5529,6 +5586,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" @@ -5548,8 +5615,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d54ee1d4ed486f78874278e63e4069fc1ab9f6a18ca492076ffb90c5eb2997fd" dependencies = [ "cfg-if 1.0.0", - "serde", - "serde_json", "wasm-bindgen-macro", ] diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 93cbba48a190f1df50fb97c8b7be78103c40626a..dc8d8a574f7f9dc7d598ae2aa91f5c1758e82920 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -11,7 +11,7 @@ use async_tungstenite::tungstenite::{ error::Error as WebsocketError, http::{Request, StatusCode}, }; -use futures::{future::LocalBoxFuture, FutureExt, StreamExt}; +use futures::{future::LocalBoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt}; use gpui::{ actions, AnyModelHandle, AnyViewHandle, AnyWeakModelHandle, AnyWeakViewHandle, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, View, ViewContext, ViewHandle, @@ -774,7 +774,7 @@ impl Client { "Authorization", format!("{} {}", credentials.user_id, credentials.access_token), ) - .header("X-Zed-Protocol-Version", rpc::PROTOCOL_VERSION); + .header("x-zed-protocol-version", rpc::PROTOCOL_VERSION); let http = self.http.clone(); cx.background().spawn(async move { @@ -817,13 +817,21 @@ impl Client { let request = request.uri(rpc_url.as_str()).body(())?; let (stream, _) = async_tungstenite::async_tls::client_async_tls(request, stream).await?; - Ok(Connection::new(stream)) + Ok(Connection::new( + stream + .map_err(|error| anyhow!(error)) + .sink_map_err(|error| anyhow!(error)), + )) } "http" => { rpc_url.set_scheme("ws").unwrap(); let request = request.uri(rpc_url.as_str()).body(())?; let (stream, _) = async_tungstenite::client_async(request, stream).await?; - Ok(Connection::new(stream)) + Ok(Connection::new( + stream + .map_err(|error| anyhow!(error)) + .sink_map_err(|error| anyhow!(error)), + )) } _ => Err(anyhow!("invalid rpc url: {}", rpc_url))?, } diff --git a/crates/collab/Cargo.toml b/crates/collab/Cargo.toml index 44b551cd05de4249653c603895e3c3b96bcda7ad..be0bace142a1d67cb3b0bd69925bbc5e3df0485a 100644 --- a/crates/collab/Cargo.toml +++ b/crates/collab/Cargo.toml @@ -16,15 +16,17 @@ required-features = ["seed-support"] collections = { path = "../collections" } rpc = { path = "../rpc" } util = { path = "../util" } + anyhow = "1.0.40" -async-io = "1.3" -async-std = { version = "1.8.0", features = ["attributes"] } async-trait = "0.1.50" async-tungstenite = "0.16" +axum = { version = "0.5", features = ["json", "headers", "ws"] } base64 = "0.13" envy = "0.4.2" +env_logger = "0.8" futures = "0.3" json_env_logger = "0.1" +lazy_static = "1.4" lipsum = { version = "0.8", optional = true } log = { version = "0.4.16", features = ["kv_unstable_serde"] } parking_lot = "0.11.1" @@ -33,9 +35,9 @@ scrypt = "0.7" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha-1 = "0.9" -surf = "2.2.0" -tide = "0.16.0" -tide-compress = "0.9.0" +tokio = { version = "1", features = ["full"] } +tokio-tungstenite = "0.17" +tower = "0.4" time = "0.2" toml = "0.5.8" diff --git a/crates/collab/src/api.rs b/crates/collab/src/api.rs index c909650f26e7466e09ab845b6174da062747da95..818e92316c19a8f9d171fce0d99b3c300a2dfb60 100644 --- a/crates/collab/src/api.rs +++ b/crates/collab/src/api.rs @@ -1,179 +1,184 @@ -use crate::{auth, db::UserId, AppState, Request, RequestExt as _}; -use async_trait::async_trait; -use serde::Deserialize; -use serde_json::json; +use crate::{ + auth, + db::{User, UserId}, + AppState, Error, Result, +}; +use anyhow::anyhow; +use axum::{ + body::Body, + extract::{Path, Query}, + http::{self, Request, StatusCode}, + middleware::{self, Next}, + response::IntoResponse, + routing::{get, post, put}, + Extension, Json, Router, +}; +use serde::{Deserialize, Serialize}; use std::sync::Arc; -use surf::StatusCode; - -pub fn add_routes(app: &mut tide::Server>) { - app.at("/users").get(get_users); - app.at("/users").post(create_user); - app.at("/users/:id").put(update_user); - app.at("/users/:id").delete(destroy_user); - app.at("/users/:github_login").get(get_user); - app.at("/users/:github_login/access_tokens") - .post(create_access_token); +use tower::ServiceBuilder; + +pub fn routes(state: Arc) -> Router { + Router::new() + .route("/users", get(get_users).post(create_user)) + .route( + "/users/:id", + put(update_user).delete(destroy_user).get(get_user), + ) + .route("/users/:id/access_tokens", post(create_access_token)) + .layer( + ServiceBuilder::new() + .layer(Extension(state)) + .layer(middleware::from_fn(validate_api_token)), + ) + // TODO: Compression on API routes? } -async fn get_user(request: Request) -> tide::Result { - request.require_token().await?; - - let user = request - .db() - .get_user_by_github_login(request.param("github_login")?) - .await? - .ok_or_else(|| surf::Error::from_str(404, "user not found"))?; +pub async fn validate_api_token(req: Request, next: Next) -> impl IntoResponse { + let token = req + .headers() + .get(http::header::AUTHORIZATION) + .and_then(|header| header.to_str().ok()) + .ok_or_else(|| { + Error::Http( + StatusCode::BAD_REQUEST, + "missing authorization header".to_string(), + ) + })? + .strip_prefix("token ") + .ok_or_else(|| { + Error::Http( + StatusCode::BAD_REQUEST, + "invalid authorization header".to_string(), + ) + })?; + + let state = req.extensions().get::>().unwrap(); + + if token != state.api_token { + Err(Error::Http( + StatusCode::UNAUTHORIZED, + "invalid authorization token".to_string(), + ))? + } - Ok(tide::Response::builder(StatusCode::Ok) - .body(tide::Body::from_json(&user)?) - .build()) + Ok::<_, Error>(next.run(req).await) } -async fn get_users(request: Request) -> tide::Result { - request.require_token().await?; - - let users = request.db().get_all_users().await?; - - Ok(tide::Response::builder(StatusCode::Ok) - .body(tide::Body::from_json(&users)?) - .build()) +async fn get_users(Extension(app): Extension>) -> Result>> { + let users = app.db.get_all_users().await?; + Ok(Json(users)) } -async fn create_user(mut request: Request) -> tide::Result { - request.require_token().await?; - - #[derive(Deserialize)] - struct Params { - github_login: String, - admin: bool, - } - let params = request.body_json::().await?; +#[derive(Deserialize)] +struct CreateUserParams { + github_login: String, + admin: bool, +} - let user_id = request - .db() +async fn create_user( + Json(params): Json, + Extension(app): Extension>, +) -> Result> { + let user_id = app + .db .create_user(¶ms.github_login, params.admin) .await?; - let user = request.db().get_user_by_id(user_id).await?.ok_or_else(|| { - surf::Error::from_str( - StatusCode::InternalServerError, - "couldn't find the user we just created", - ) - })?; + let user = app + .db + .get_user_by_id(user_id) + .await? + .ok_or_else(|| anyhow!("couldn't find the user we just created"))?; - Ok(tide::Response::builder(StatusCode::Ok) - .body(tide::Body::from_json(&user)?) - .build()) + Ok(Json(user)) } -async fn update_user(mut request: Request) -> tide::Result { - request.require_token().await?; +#[derive(Deserialize)] +struct UpdateUserParams { + admin: bool, +} - #[derive(Deserialize)] - struct Params { - admin: bool, - } - let user_id = UserId( - request - .param("id")? - .parse::() - .map_err(|error| surf::Error::from_str(StatusCode::BadRequest, error.to_string()))?, - ); - let params = request.body_json::().await?; - - request - .db() - .set_user_is_admin(user_id, params.admin) +async fn update_user( + Path(user_id): Path, + Json(params): Json, + Extension(app): Extension>, +) -> Result<()> { + app.db + .set_user_is_admin(UserId(user_id), params.admin) .await?; + Ok(()) +} - Ok(tide::Response::builder(StatusCode::Ok).build()) +async fn destroy_user( + Path(user_id): Path, + Extension(app): Extension>, +) -> Result<()> { + app.db.destroy_user(UserId(user_id)).await?; + Ok(()) } -async fn destroy_user(request: Request) -> tide::Result { - request.require_token().await?; - let user_id = UserId( - request - .param("id")? - .parse::() - .map_err(|error| surf::Error::from_str(StatusCode::BadRequest, error.to_string()))?, - ); +async fn get_user( + Path(login): Path, + Extension(app): Extension>, +) -> Result> { + let user = app + .db + .get_user_by_github_login(&login) + .await? + .ok_or_else(|| anyhow!("user not found"))?; + Ok(Json(user)) +} - request.db().destroy_user(user_id).await?; +#[derive(Deserialize)] +struct CreateAccessTokenQueryParams { + public_key: String, + impersonate: Option, +} - Ok(tide::Response::builder(StatusCode::Ok).build()) +#[derive(Serialize)] +struct CreateAccessTokenResponse { + user_id: UserId, + encrypted_access_token: String, } -async fn create_access_token(request: Request) -> tide::Result { - request.require_token().await?; +async fn create_access_token( + Path(login): Path, + Query(params): Query, + Extension(app): Extension>, +) -> Result> { + // request.require_token().await?; - let user = request - .db() - .get_user_by_github_login(request.param("github_login")?) + let user = app + .db + .get_user_by_github_login(&login) .await? - .ok_or_else(|| surf::Error::from_str(StatusCode::NotFound, "user not found"))?; - - #[derive(Deserialize)] - struct QueryParams { - public_key: String, - impersonate: Option, - } - - let query_params: QueryParams = request.query().map_err(|_| { - surf::Error::from_str(StatusCode::UnprocessableEntity, "invalid query params") - })?; + .ok_or_else(|| anyhow!("user not found"))?; let mut user_id = user.id; - if let Some(impersonate) = query_params.impersonate { + if let Some(impersonate) = params.impersonate { if user.admin { - if let Some(impersonated_user) = - request.db().get_user_by_github_login(&impersonate).await? - { + if let Some(impersonated_user) = app.db.get_user_by_github_login(&impersonate).await? { user_id = impersonated_user.id; } else { - return Ok(tide::Response::builder(StatusCode::UnprocessableEntity) - .body(format!( - "Can't impersonate non-existent user {}", - impersonate - )) - .build()); + return Err(Error::Http( + StatusCode::UNPROCESSABLE_ENTITY, + format!("user {impersonate} does not exist"), + )); } } else { - return Ok(tide::Response::builder(StatusCode::Unauthorized) - .body(format!( - "Can't impersonate user {} because the real user isn't an admin", - impersonate - )) - .build()); + return Err(Error::Http( + StatusCode::UNAUTHORIZED, + format!("you do not have permission to impersonate other users"), + )); } } - let access_token = auth::create_access_token(request.db().as_ref(), user_id).await?; + let access_token = auth::create_access_token(app.db.as_ref(), user_id).await?; let encrypted_access_token = - auth::encrypt_access_token(&access_token, query_params.public_key.clone())?; + auth::encrypt_access_token(&access_token, params.public_key.clone())?; - Ok(tide::Response::builder(StatusCode::Ok) - .body(json!({"user_id": user_id, "encrypted_access_token": encrypted_access_token})) - .build()) -} - -#[async_trait] -pub trait RequestExt { - async fn require_token(&self) -> tide::Result<()>; -} - -#[async_trait] -impl RequestExt for Request { - async fn require_token(&self) -> tide::Result<()> { - let token = self - .header("Authorization") - .and_then(|header| header.get(0)) - .and_then(|header| header.as_str().strip_prefix("token ")) - .ok_or_else(|| surf::Error::from_str(403, "invalid authorization header"))?; - - if token == self.state().config.api_token { - Ok(()) - } else { - Err(tide::Error::from_str(403, "invalid authorization token")) - } - } + Ok(Json(CreateAccessTokenResponse { + user_id, + encrypted_access_token, + })) } diff --git a/crates/collab/src/auth.rs b/crates/collab/src/auth.rs index 0d2bb045d7cb74c6f37adde0808de237fd118829..aad331faecfa6e05328d6f665862b054ed11fe51 100644 --- a/crates/collab/src/auth.rs +++ b/crates/collab/src/auth.rs @@ -1,45 +1,47 @@ -use super::{ - db::{self, UserId}, - errors::TideResultExt, +use std::sync::Arc; + +use super::db::{self, UserId}; +use crate::{AppState, Error}; +use anyhow::{Context, Result}; +use axum::{ + http::{self, Request, StatusCode}, + middleware::Next, + response::IntoResponse, }; -use crate::Request; -use anyhow::{anyhow, Context}; use rand::thread_rng; -use rpc::auth as zed_auth; use scrypt::{ password_hash::{PasswordHash, PasswordHasher, PasswordVerifier, SaltString}, Scrypt, }; -use std::convert::TryFrom; -use surf::StatusCode; -use tide::Error; -pub async fn process_auth_header(request: &Request) -> tide::Result { - let mut auth_header = request - .header("Authorization") +pub async fn validate_header(mut req: Request, next: Next) -> impl IntoResponse { + let mut auth_header = req + .headers() + .get(http::header::AUTHORIZATION) + .and_then(|header| header.to_str().ok()) .ok_or_else(|| { - Error::new( - StatusCode::BadRequest, - anyhow!("missing authorization header"), + Error::Http( + StatusCode::BAD_REQUEST, + "missing authorization header".to_string(), ) })? - .last() - .as_str() .split_whitespace(); + let user_id = UserId(auth_header.next().unwrap_or("").parse().map_err(|_| { - Error::new( - StatusCode::BadRequest, - anyhow!("missing user id in authorization header"), + Error::Http( + StatusCode::BAD_REQUEST, + "missing user id in authorization header".to_string(), ) })?); + let access_token = auth_header.next().ok_or_else(|| { - Error::new( - StatusCode::BadRequest, - anyhow!("missing access token in authorization header"), + Error::Http( + StatusCode::BAD_REQUEST, + "missing access token in authorization header".to_string(), ) })?; - let state = request.state().clone(); + let state = req.extensions().get::>().unwrap(); let mut credentials_valid = false; for password_hash in state.db.get_access_token_hashes(user_id).await? { if verify_access_token(&access_token, &password_hash)? { @@ -48,20 +50,21 @@ pub async fn process_auth_header(request: &Request) -> tide::Result { } } - if !credentials_valid { - Err(Error::new( - StatusCode::Unauthorized, - anyhow!("invalid credentials"), - ))?; + if credentials_valid { + req.extensions_mut().insert(user_id); + Ok::<_, Error>(next.run(req).await) + } else { + Err(Error::Http( + StatusCode::UNAUTHORIZED, + "invalid credentials".to_string(), + )) } - - Ok(user_id) } const MAX_ACCESS_TOKENS_TO_STORE: usize = 8; -pub async fn create_access_token(db: &dyn db::Db, user_id: UserId) -> tide::Result { - let access_token = zed_auth::random_token(); +pub async fn create_access_token(db: &dyn db::Db, user_id: UserId) -> Result { + let access_token = rpc::auth::random_token(); let access_token_hash = hash_access_token(&access_token).context("failed to hash access token")?; db.create_access_token_hash(user_id, &access_token_hash, MAX_ACCESS_TOKENS_TO_STORE) @@ -69,7 +72,7 @@ pub async fn create_access_token(db: &dyn db::Db, user_id: UserId) -> tide::Resu Ok(access_token) } -fn hash_access_token(token: &str) -> tide::Result { +fn hash_access_token(token: &str) -> Result { // Avoid slow hashing in debug mode. let params = if cfg!(debug_assertions) { scrypt::Params::new(1, 1, 1).unwrap() @@ -87,16 +90,16 @@ fn hash_access_token(token: &str) -> tide::Result { .to_string()) } -pub fn encrypt_access_token(access_token: &str, public_key: String) -> tide::Result { +pub fn encrypt_access_token(access_token: &str, public_key: String) -> Result { let native_app_public_key = - zed_auth::PublicKey::try_from(public_key).context("failed to parse app public key")?; + rpc::auth::PublicKey::try_from(public_key).context("failed to parse app public key")?; let encrypted_access_token = native_app_public_key .encrypt_string(&access_token) .context("failed to encrypt access token with public key")?; Ok(encrypted_access_token) } -pub fn verify_access_token(token: &str, hash: &str) -> tide::Result { +pub fn verify_access_token(token: &str, hash: &str) -> Result { let hash = PasswordHash::new(hash)?; Ok(Scrypt.verify_password(token.as_bytes(), &hash).is_ok()) } diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 1ae85559e0f115649ebe192024c0b3c4170ea657..157a2445e53b3e9ffbab5784f583821ef88463b3 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1,26 +1,11 @@ use anyhow::Context; use anyhow::Result; -use async_std::task::{block_on, yield_now}; use async_trait::async_trait; use serde::Serialize; pub use sqlx::postgres::PgPoolOptions as DbOptions; use sqlx::{types::Uuid, FromRow}; use time::OffsetDateTime; -macro_rules! test_support { - ($self:ident, { $($token:tt)* }) => {{ - let body = async { - $($token)* - }; - if $self.test_mode { - yield_now().await; - block_on(body) - } else { - body.await - } - }}; -} - #[async_trait] pub trait Db: Send + Sync { async fn create_user(&self, github_login: &str, admin: bool) -> Result; @@ -77,20 +62,16 @@ pub trait Db: Send + Sync { pub struct PostgresDb { pool: sqlx::PgPool, - test_mode: bool, } impl PostgresDb { - pub async fn new(url: &str, max_connections: u32) -> tide::Result { + pub async fn new(url: &str, max_connections: u32) -> Result { let pool = DbOptions::new() .max_connections(max_connections) .connect(url) .await .context("failed to connect to postgres database")?; - Ok(Self { - pool, - test_mode: false, - }) + Ok(Self { pool }) } } @@ -99,27 +80,23 @@ impl Db for PostgresDb { // users async fn create_user(&self, github_login: &str, admin: bool) -> Result { - test_support!(self, { - let query = " + let query = " INSERT INTO users (github_login, admin) VALUES ($1, $2) ON CONFLICT (github_login) DO UPDATE SET github_login = excluded.github_login RETURNING id "; - Ok(sqlx::query_scalar(query) - .bind(github_login) - .bind(admin) - .fetch_one(&self.pool) - .await - .map(UserId)?) - }) + Ok(sqlx::query_scalar(query) + .bind(github_login) + .bind(admin) + .fetch_one(&self.pool) + .await + .map(UserId)?) } async fn get_all_users(&self) -> Result> { - test_support!(self, { - let query = "SELECT * FROM users ORDER BY github_login ASC"; - Ok(sqlx::query_as(query).fetch_all(&self.pool).await?) - }) + let query = "SELECT * FROM users ORDER BY github_login ASC"; + Ok(sqlx::query_as(query).fetch_all(&self.pool).await?) } async fn get_user_by_id(&self, id: UserId) -> Result> { @@ -129,57 +106,49 @@ impl Db for PostgresDb { async fn get_users_by_ids(&self, ids: Vec) -> Result> { let ids = ids.into_iter().map(|id| id.0).collect::>(); - test_support!(self, { - let query = " + let query = " SELECT users.* FROM users WHERE users.id = ANY ($1) "; - Ok(sqlx::query_as(query) - .bind(&ids) - .fetch_all(&self.pool) - .await?) - }) + Ok(sqlx::query_as(query) + .bind(&ids) + .fetch_all(&self.pool) + .await?) } async fn get_user_by_github_login(&self, github_login: &str) -> Result> { - test_support!(self, { - let query = "SELECT * FROM users WHERE github_login = $1 LIMIT 1"; - Ok(sqlx::query_as(query) - .bind(github_login) - .fetch_optional(&self.pool) - .await?) - }) + let query = "SELECT * FROM users WHERE github_login = $1 LIMIT 1"; + Ok(sqlx::query_as(query) + .bind(github_login) + .fetch_optional(&self.pool) + .await?) } async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> { - test_support!(self, { - let query = "UPDATE users SET admin = $1 WHERE id = $2"; - Ok(sqlx::query(query) - .bind(is_admin) - .bind(id.0) - .execute(&self.pool) - .await - .map(drop)?) - }) + let query = "UPDATE users SET admin = $1 WHERE id = $2"; + Ok(sqlx::query(query) + .bind(is_admin) + .bind(id.0) + .execute(&self.pool) + .await + .map(drop)?) } async fn destroy_user(&self, id: UserId) -> Result<()> { - test_support!(self, { - let query = "DELETE FROM access_tokens WHERE user_id = $1;"; - sqlx::query(query) - .bind(id.0) - .execute(&self.pool) - .await - .map(drop)?; - let query = "DELETE FROM users WHERE id = $1;"; - Ok(sqlx::query(query) - .bind(id.0) - .execute(&self.pool) - .await - .map(drop)?) - }) + let query = "DELETE FROM access_tokens WHERE user_id = $1;"; + sqlx::query(query) + .bind(id.0) + .execute(&self.pool) + .await + .map(drop)?; + let query = "DELETE FROM users WHERE id = $1;"; + Ok(sqlx::query(query) + .bind(id.0) + .execute(&self.pool) + .await + .map(drop)?) } // access tokens @@ -190,12 +159,11 @@ impl Db for PostgresDb { access_token_hash: &str, max_access_token_count: usize, ) -> Result<()> { - test_support!(self, { - let insert_query = " + let insert_query = " INSERT INTO access_tokens (user_id, hash) VALUES ($1, $2); "; - let cleanup_query = " + let cleanup_query = " DELETE FROM access_tokens WHERE id IN ( SELECT id from access_tokens @@ -205,35 +173,32 @@ impl Db for PostgresDb { ) "; - let mut tx = self.pool.begin().await?; - sqlx::query(insert_query) - .bind(user_id.0) - .bind(access_token_hash) - .execute(&mut tx) - .await?; - sqlx::query(cleanup_query) - .bind(user_id.0) - .bind(access_token_hash) - .bind(max_access_token_count as u32) - .execute(&mut tx) - .await?; - Ok(tx.commit().await?) - }) + let mut tx = self.pool.begin().await?; + sqlx::query(insert_query) + .bind(user_id.0) + .bind(access_token_hash) + .execute(&mut tx) + .await?; + sqlx::query(cleanup_query) + .bind(user_id.0) + .bind(access_token_hash) + .bind(max_access_token_count as u32) + .execute(&mut tx) + .await?; + Ok(tx.commit().await?) } async fn get_access_token_hashes(&self, user_id: UserId) -> Result> { - test_support!(self, { - let query = " + let query = " SELECT hash FROM access_tokens WHERE user_id = $1 ORDER BY id DESC "; - Ok(sqlx::query_scalar(query) - .bind(user_id.0) - .fetch_all(&self.pool) - .await?) - }) + Ok(sqlx::query_scalar(query) + .bind(user_id.0) + .fetch_all(&self.pool) + .await?) } // orgs @@ -241,94 +206,83 @@ impl Db for PostgresDb { #[allow(unused)] // Help rust-analyzer #[cfg(any(test, feature = "seed-support"))] async fn find_org_by_slug(&self, slug: &str) -> Result> { - test_support!(self, { - let query = " + let query = " SELECT * FROM orgs WHERE slug = $1 "; - Ok(sqlx::query_as(query) - .bind(slug) - .fetch_optional(&self.pool) - .await?) - }) + Ok(sqlx::query_as(query) + .bind(slug) + .fetch_optional(&self.pool) + .await?) } #[cfg(any(test, feature = "seed-support"))] async fn create_org(&self, name: &str, slug: &str) -> Result { - test_support!(self, { - let query = " + let query = " INSERT INTO orgs (name, slug) VALUES ($1, $2) RETURNING id "; - Ok(sqlx::query_scalar(query) - .bind(name) - .bind(slug) - .fetch_one(&self.pool) - .await - .map(OrgId)?) - }) + Ok(sqlx::query_scalar(query) + .bind(name) + .bind(slug) + .fetch_one(&self.pool) + .await + .map(OrgId)?) } #[cfg(any(test, feature = "seed-support"))] async fn add_org_member(&self, org_id: OrgId, user_id: UserId, is_admin: bool) -> Result<()> { - test_support!(self, { - let query = " + let query = " INSERT INTO org_memberships (org_id, user_id, admin) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING "; - Ok(sqlx::query(query) - .bind(org_id.0) - .bind(user_id.0) - .bind(is_admin) - .execute(&self.pool) - .await - .map(drop)?) - }) + Ok(sqlx::query(query) + .bind(org_id.0) + .bind(user_id.0) + .bind(is_admin) + .execute(&self.pool) + .await + .map(drop)?) } // channels #[cfg(any(test, feature = "seed-support"))] async fn create_org_channel(&self, org_id: OrgId, name: &str) -> Result { - test_support!(self, { - let query = " + let query = " INSERT INTO channels (owner_id, owner_is_user, name) VALUES ($1, false, $2) RETURNING id "; - Ok(sqlx::query_scalar(query) - .bind(org_id.0) - .bind(name) - .fetch_one(&self.pool) - .await - .map(ChannelId)?) - }) + Ok(sqlx::query_scalar(query) + .bind(org_id.0) + .bind(name) + .fetch_one(&self.pool) + .await + .map(ChannelId)?) } #[allow(unused)] // Help rust-analyzer #[cfg(any(test, feature = "seed-support"))] async fn get_org_channels(&self, org_id: OrgId) -> Result> { - test_support!(self, { - let query = " + let query = " SELECT * FROM channels WHERE channels.owner_is_user = false AND channels.owner_id = $1 "; - Ok(sqlx::query_as(query) - .bind(org_id.0) - .fetch_all(&self.pool) - .await?) - }) + Ok(sqlx::query_as(query) + .bind(org_id.0) + .fetch_all(&self.pool) + .await?) } async fn get_accessible_channels(&self, user_id: UserId) -> Result> { - test_support!(self, { - let query = " + let query = " SELECT channels.* FROM @@ -337,11 +291,10 @@ impl Db for PostgresDb { channel_memberships.user_id = $1 AND channel_memberships.channel_id = channels.id "; - Ok(sqlx::query_as(query) - .bind(user_id.0) - .fetch_all(&self.pool) - .await?) - }) + Ok(sqlx::query_as(query) + .bind(user_id.0) + .fetch_all(&self.pool) + .await?) } async fn can_user_access_channel( @@ -349,20 +302,18 @@ impl Db for PostgresDb { user_id: UserId, channel_id: ChannelId, ) -> Result { - test_support!(self, { - let query = " + let query = " SELECT id FROM channel_memberships WHERE user_id = $1 AND channel_id = $2 LIMIT 1 "; - Ok(sqlx::query_scalar::<_, i32>(query) - .bind(user_id.0) - .bind(channel_id.0) - .fetch_optional(&self.pool) - .await - .map(|e| e.is_some())?) - }) + Ok(sqlx::query_scalar::<_, i32>(query) + .bind(user_id.0) + .bind(channel_id.0) + .fetch_optional(&self.pool) + .await + .map(|e| e.is_some())?) } #[cfg(any(test, feature = "seed-support"))] @@ -372,20 +323,18 @@ impl Db for PostgresDb { user_id: UserId, is_admin: bool, ) -> Result<()> { - test_support!(self, { - let query = " + let query = " INSERT INTO channel_memberships (channel_id, user_id, admin) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING "; - Ok(sqlx::query(query) - .bind(channel_id.0) - .bind(user_id.0) - .bind(is_admin) - .execute(&self.pool) - .await - .map(drop)?) - }) + Ok(sqlx::query(query) + .bind(channel_id.0) + .bind(user_id.0) + .bind(is_admin) + .execute(&self.pool) + .await + .map(drop)?) } // messages @@ -398,23 +347,21 @@ impl Db for PostgresDb { timestamp: OffsetDateTime, nonce: u128, ) -> Result { - test_support!(self, { - let query = " + let query = " INSERT INTO channel_messages (channel_id, sender_id, body, sent_at, nonce) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (nonce) DO UPDATE SET nonce = excluded.nonce RETURNING id "; - Ok(sqlx::query_scalar(query) - .bind(channel_id.0) - .bind(sender_id.0) - .bind(body) - .bind(timestamp) - .bind(Uuid::from_u128(nonce)) - .fetch_one(&self.pool) - .await - .map(MessageId)?) - }) + Ok(sqlx::query_scalar(query) + .bind(channel_id.0) + .bind(sender_id.0) + .bind(body) + .bind(timestamp) + .bind(Uuid::from_u128(nonce)) + .fetch_one(&self.pool) + .await + .map(MessageId)?) } async fn get_channel_messages( @@ -423,8 +370,7 @@ impl Db for PostgresDb { count: usize, before_id: Option, ) -> Result> { - test_support!(self, { - let query = r#" + let query = r#" SELECT * FROM ( SELECT id, channel_id, sender_id, body, sent_at AT TIME ZONE 'UTC' as sent_at, nonce @@ -438,35 +384,32 @@ impl Db for PostgresDb { ) as recent_messages ORDER BY id ASC "#; - Ok(sqlx::query_as(query) - .bind(channel_id.0) - .bind(before_id.unwrap_or(MessageId::MAX)) - .bind(count as i64) - .fetch_all(&self.pool) - .await?) - }) + Ok(sqlx::query_as(query) + .bind(channel_id.0) + .bind(before_id.unwrap_or(MessageId::MAX)) + .bind(count as i64) + .fetch_all(&self.pool) + .await?) } #[cfg(test)] async fn teardown(&self, name: &str, url: &str) { use util::ResultExt; - test_support!(self, { - let query = " + let query = " SELECT pg_terminate_backend(pg_stat_activity.pid) FROM pg_stat_activity WHERE pg_stat_activity.datname = '{}' AND pid <> pg_backend_pid(); "; - sqlx::query(query) - .bind(name) - .execute(&self.pool) - .await - .log_err(); - self.pool.close().await; - ::drop_database(url) - .await - .log_err(); - }) + sqlx::query(query) + .bind(name) + .execute(&self.pool) + .await + .log_err(); + self.pool.close().await; + ::drop_database(url) + .await + .log_err(); } } @@ -704,12 +647,11 @@ pub mod tests { let name = format!("zed-test-{}", rng.gen::()); let url = format!("postgres://postgres@localhost/{}", name); let migrations_path = Path::new(concat!(env!("CARGO_MANIFEST_DIR"), "/migrations")); - let db = block_on(async { + let db = futures::executor::block_on(async { Postgres::create_database(&url) .await .expect("failed to create test db"); - let mut db = PostgresDb::new(&url, 5).await.unwrap(); - db.test_mode = true; + let db = PostgresDb::new(&url, 5).await.unwrap(); let migrator = Migrator::new(migrations_path).await.unwrap(); migrator.run(&db.pool).await.unwrap(); db @@ -737,7 +679,7 @@ pub mod tests { impl Drop for TestDb { fn drop(&mut self) { if let Some(db) = self.db.take() { - block_on(db.teardown(&self.name, &self.url)); + futures::executor::block_on(db.teardown(&self.name, &self.url)); } } } diff --git a/crates/collab/src/main.rs b/crates/collab/src/main.rs index d5f7a570ae5e8456e424df3c5f31c3e2606701ac..f0250a6835fa16227e04ccd1daa8c176591b395b 100644 --- a/crates/collab/src/main.rs +++ b/crates/collab/src/main.rs @@ -2,18 +2,16 @@ mod api; mod auth; mod db; mod env; -mod errors; mod rpc; -use ::rpc::Peer; -use async_std::net::TcpListener; -use async_trait::async_trait; +use axum::{body::Body, http::StatusCode, response::IntoResponse, Router}; use db::{Db, PostgresDb}; -use serde::Deserialize; -use std::sync::Arc; -use tide_compress::CompressMiddleware; -type Request = tide::Request>; +use serde::Deserialize; +use std::{ + net::{SocketAddr, TcpListener}, + sync::Arc, +}; #[derive(Default, Deserialize)] pub struct Config { @@ -24,39 +22,26 @@ pub struct Config { pub struct AppState { db: Arc, - config: Config, + api_token: String, } impl AppState { - async fn new(config: Config) -> tide::Result> { + async fn new(config: &Config) -> Result> { let db = PostgresDb::new(&config.database_url, 5).await?; - let this = Self { db: Arc::new(db), - config, + api_token: config.api_token.clone(), }; Ok(Arc::new(this)) } } -#[async_trait] -trait RequestExt { - fn db(&self) -> &Arc; -} - -#[async_trait] -impl RequestExt for Request { - fn db(&self) -> &Arc { - &self.state().db - } -} - -#[async_std::main] -async fn main() -> tide::Result<()> { +#[tokio::main] +async fn main() -> Result<()> { if std::env::var("LOG_JSON").is_ok() { json_env_logger::init(); } else { - tide::log::start(); + env_logger::init(); } if let Err(error) = env::load_dotenv() { @@ -67,32 +52,63 @@ async fn main() -> tide::Result<()> { } let config = envy::from_env::().expect("error loading config"); - let state = AppState::new(config).await?; - let rpc = Peer::new(); - run_server( - state.clone(), - rpc, - TcpListener::bind(&format!("0.0.0.0:{}", state.config.http_port)).await?, - ) - .await?; + let state = AppState::new(&config).await?; + + let listener = TcpListener::bind(&format!("0.0.0.0:{}", config.http_port)) + .expect("failed to bind TCP listener"); + + let app = Router::::new() + .merge(api::routes(state.clone())) + .merge(rpc::routes(state)); + + axum::Server::from_tcp(listener)? + .serve(app.into_make_service_with_connect_info::()) + .await?; + Ok(()) } -pub async fn run_server( - state: Arc, - rpc: Arc, - listener: TcpListener, -) -> tide::Result<()> { - let mut app = tide::with_state(state.clone()); - rpc::add_routes(&mut app, &rpc); +pub type Result = std::result::Result; - let mut web = tide::with_state(state.clone()); - web.with(CompressMiddleware::new()); - api::add_routes(&mut web); +pub enum Error { + Http(StatusCode, String), + Internal(anyhow::Error), +} - app.at("/").nest(web); +impl From for Error +where + E: Into, +{ + fn from(error: E) -> Self { + Self::Internal(error.into()) + } +} - app.listen(listener).await?; +impl IntoResponse for Error { + fn into_response(self) -> axum::response::Response { + match self { + Error::Http(code, message) => (code, message).into_response(), + Error::Internal(error) => { + (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", &error)).into_response() + } + } + } +} - Ok(()) +impl std::fmt::Debug for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Error::Http(code, message) => (code, message).fmt(f), + Error::Internal(error) => error.fmt(f), + } + } +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Error::Http(code, message) => write!(f, "{code}: {message}"), + Error::Internal(error) => error.fmt(f), + } + } } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 47b16e2633d6ac9e262cb0035bb6891edcdae33e..6c4775ba6d3229d030e02273e14734ed83ed9032 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -1,47 +1,56 @@ mod store; -use super::{ - auth::process_auth_header, +use crate::{ + auth, db::{ChannelId, MessageId, UserId}, - AppState, + AppState, Result, }; use anyhow::anyhow; -use async_io::Timer; -use async_std::{ - sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, - task, +use async_tungstenite::tungstenite::{ + protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage, +}; +use axum::{ + body::Body, + extract::{ + ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage}, + ConnectInfo, WebSocketUpgrade, + }, + headers::{Header, HeaderName}, + http::StatusCode, + middleware, + response::{IntoResponse, Response}, + routing::get, + Extension, Router, TypedHeader, }; -use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream}; use collections::{HashMap, HashSet}; -use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt}; +use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt}; +use lazy_static::lazy_static; use log::{as_debug, as_display}; use rpc::{ proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage}, Connection, ConnectionId, Peer, TypedEnvelope, }; -use sha1::{Digest as _, Sha1}; use std::{ any::TypeId, future::Future, marker::PhantomData, + net::SocketAddr, ops::{Deref, DerefMut}, rc::Rc, sync::Arc, time::{Duration, Instant}, }; use store::{Store, Worktree}; -use surf::StatusCode; -use tide::{ - http::headers::{HeaderName, CONNECTION, UPGRADE}, - Request, Response, -}; use time::OffsetDateTime; +use tokio::{ + sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, + time::Sleep, +}; +use tower::ServiceBuilder; use util::ResultExt; type MessageHandler = Box< - dyn Send - + Sync - + Fn(Arc, Box) -> BoxFuture<'static, tide::Result<()>>, + dyn Send + Sync + Fn(Arc, Box) -> BoxFuture<'static, Result<()>>, >; pub struct Server { @@ -53,9 +62,9 @@ pub struct Server { } pub trait Executor: Send + Clone { - type Timer: Send + Future; + type Sleep: Send + Future; fn spawn_detached>(&self, future: F); - fn timer(&self, duration: Duration) -> Self::Timer; + fn sleep(&self, duration: Duration) -> Self::Sleep; } #[derive(Clone)] @@ -77,11 +86,10 @@ struct StoreWriteGuard<'a> { impl Server { pub fn new( app_state: Arc, - peer: Arc, notifications: Option>, ) -> Arc { let mut server = Self { - peer, + peer: Peer::new(), app_state, store: Default::default(), handlers: Default::default(), @@ -141,7 +149,7 @@ impl Server { fn add_message_handler(&mut self, handler: F) -> &mut Self where F: 'static + Send + Sync + Fn(Arc, TypedEnvelope) -> Fut, - Fut: 'static + Send + Future>, + Fut: 'static + Send + Future>, M: EnvelopedMessage, { let prev_handler = self.handlers.insert( @@ -160,7 +168,7 @@ impl Server { fn add_request_handler(&mut self, handler: F) -> &mut Self where F: 'static + Send + Sync + Fn(Arc, TypedEnvelope) -> Fut, - Fut: 'static + Send + Future>, + Fut: 'static + Send + Future>, M: RequestMessage, { self.add_message_handler(move |server, envelope| { @@ -193,7 +201,7 @@ impl Server { F: 'static + Send + Sync - + Fn(Arc, &mut Store, TypedEnvelope) -> tide::Result, + + Fn(Arc, &mut Store, TypedEnvelope) -> Result, M: RequestMessage, { let handler = Arc::new(handler); @@ -237,7 +245,7 @@ impl Server { .add_connection(connection, { let executor = executor.clone(); move |duration| { - let timer = executor.timer(duration); + let timer = executor.sleep(duration); async move { timer.await; } @@ -308,7 +316,7 @@ impl Server { } } - async fn sign_out(self: &mut Arc, connection_id: ConnectionId) -> tide::Result<()> { + async fn sign_out(self: &mut Arc, connection_id: ConnectionId) -> Result<()> { self.peer.disconnect(connection_id); let mut state = self.state_mut().await; let removed_connection = state.remove_connection(connection_id)?; @@ -342,14 +350,14 @@ impl Server { Ok(()) } - async fn ping(self: Arc, _: TypedEnvelope) -> tide::Result { + async fn ping(self: Arc, _: TypedEnvelope) -> Result { Ok(proto::Ack {}) } async fn register_project( self: Arc, request: TypedEnvelope, - ) -> tide::Result { + ) -> Result { let project_id = { let mut state = self.state_mut().await; let user_id = state.user_id_for_connection(request.sender_id)?; @@ -361,7 +369,7 @@ impl Server { async fn unregister_project( self: Arc, request: TypedEnvelope, - ) -> tide::Result<()> { + ) -> Result<()> { let mut state = self.state_mut().await; let project = state.unregister_project(request.payload.project_id, request.sender_id)?; self.update_contacts_for_users(&*state, &project.authorized_user_ids()); @@ -371,7 +379,7 @@ impl Server { async fn share_project( self: Arc, request: TypedEnvelope, - ) -> tide::Result { + ) -> Result { let mut state = self.state_mut().await; let project = state.share_project(request.payload.project_id, request.sender_id)?; self.update_contacts_for_users(&mut *state, &project.authorized_user_ids); @@ -381,7 +389,7 @@ impl Server { async fn unshare_project( self: Arc, request: TypedEnvelope, - ) -> tide::Result<()> { + ) -> Result<()> { let project_id = request.payload.project_id; let mut state = self.state_mut().await; let project = state.unshare_project(project_id, request.sender_id)?; @@ -397,7 +405,7 @@ impl Server { self: Arc, state: &mut Store, request: TypedEnvelope, - ) -> tide::Result { + ) -> Result { let project_id = request.payload.project_id; let user_id = state.user_id_for_connection(request.sender_id)?; @@ -470,7 +478,7 @@ impl Server { async fn leave_project( self: Arc, request: TypedEnvelope, - ) -> tide::Result<()> { + ) -> Result<()> { let sender_id = request.sender_id; let project_id = request.payload.project_id; let mut state = self.state_mut().await; @@ -491,7 +499,7 @@ impl Server { async fn register_worktree( self: Arc, request: TypedEnvelope, - ) -> tide::Result { + ) -> Result { let mut contact_user_ids = HashSet::default(); for github_login in &request.payload.authorized_logins { let contact_user_id = self.app_state.db.create_user(github_login, false).await?; @@ -528,7 +536,7 @@ impl Server { async fn unregister_worktree( self: Arc, request: TypedEnvelope, - ) -> tide::Result<()> { + ) -> Result<()> { let project_id = request.payload.project_id; let worktree_id = request.payload.worktree_id; let mut state = self.state_mut().await; @@ -550,7 +558,7 @@ impl Server { async fn update_worktree( self: Arc, request: TypedEnvelope, - ) -> tide::Result { + ) -> Result { let connection_ids = self.state_mut().await.update_worktree( request.sender_id, request.payload.project_id, @@ -570,7 +578,7 @@ impl Server { async fn update_diagnostic_summary( self: Arc, request: TypedEnvelope, - ) -> tide::Result<()> { + ) -> Result<()> { let summary = request .payload .summary @@ -593,7 +601,7 @@ impl Server { async fn start_language_server( self: Arc, request: TypedEnvelope, - ) -> tide::Result<()> { + ) -> Result<()> { let receiver_ids = self.state_mut().await.start_language_server( request.payload.project_id, request.sender_id, @@ -613,7 +621,7 @@ impl Server { async fn update_language_server( self: Arc, request: TypedEnvelope, - ) -> tide::Result<()> { + ) -> Result<()> { let receiver_ids = self .state() .await @@ -628,7 +636,7 @@ impl Server { async fn forward_project_request( self: Arc, request: TypedEnvelope, - ) -> tide::Result + ) -> Result where T: EntityMessage + RequestMessage, { @@ -646,7 +654,7 @@ impl Server { async fn save_buffer( self: Arc, request: TypedEnvelope, - ) -> tide::Result { + ) -> Result { let host = self .state() .await @@ -673,7 +681,7 @@ impl Server { async fn update_buffer( self: Arc, request: TypedEnvelope, - ) -> tide::Result { + ) -> Result { let receiver_ids = self .state() .await @@ -688,7 +696,7 @@ impl Server { async fn update_buffer_file( self: Arc, request: TypedEnvelope, - ) -> tide::Result<()> { + ) -> Result<()> { let receiver_ids = self .state() .await @@ -703,7 +711,7 @@ impl Server { async fn buffer_reloaded( self: Arc, request: TypedEnvelope, - ) -> tide::Result<()> { + ) -> Result<()> { let receiver_ids = self .state() .await @@ -718,7 +726,7 @@ impl Server { async fn buffer_saved( self: Arc, request: TypedEnvelope, - ) -> tide::Result<()> { + ) -> Result<()> { let receiver_ids = self .state() .await @@ -733,7 +741,7 @@ impl Server { async fn follow( self: Arc, request: TypedEnvelope, - ) -> tide::Result { + ) -> Result { let leader_id = ConnectionId(request.payload.leader_id); let follower_id = request.sender_id; if !self @@ -754,10 +762,7 @@ impl Server { Ok(response) } - async fn unfollow( - self: Arc, - request: TypedEnvelope, - ) -> tide::Result<()> { + async fn unfollow(self: Arc, request: TypedEnvelope) -> Result<()> { let leader_id = ConnectionId(request.payload.leader_id); if !self .state() @@ -775,7 +780,7 @@ impl Server { async fn update_followers( self: Arc, request: TypedEnvelope, - ) -> tide::Result<()> { + ) -> Result<()> { let connection_ids = self .state() .await @@ -802,7 +807,7 @@ impl Server { async fn get_channels( self: Arc, request: TypedEnvelope, - ) -> tide::Result { + ) -> Result { let user_id = self .state() .await @@ -822,7 +827,7 @@ impl Server { async fn get_users( self: Arc, request: TypedEnvelope, - ) -> tide::Result { + ) -> Result { let user_ids = request .payload .user_ids @@ -867,7 +872,7 @@ impl Server { async fn join_channel( self: Arc, request: TypedEnvelope, - ) -> tide::Result { + ) -> Result { let user_id = self .state() .await @@ -908,7 +913,7 @@ impl Server { async fn leave_channel( self: Arc, request: TypedEnvelope, - ) -> tide::Result<()> { + ) -> Result<()> { let user_id = self .state() .await @@ -933,7 +938,7 @@ impl Server { async fn send_channel_message( self: Arc, request: TypedEnvelope, - ) -> tide::Result { + ) -> Result { let channel_id = ChannelId::from_proto(request.payload.channel_id); let user_id; let connection_ids; @@ -988,7 +993,7 @@ impl Server { async fn get_channel_messages( self: Arc, request: TypedEnvelope, - ) -> tide::Result { + ) -> Result { let user_id = self .state() .await @@ -1030,10 +1035,10 @@ impl Server { async fn state<'a>(self: &'a Arc) -> StoreReadGuard<'a> { #[cfg(test)] - async_std::task::yield_now().await; + tokio::task::yield_now().await; let guard = self.store.read().await; #[cfg(test)] - async_std::task::yield_now().await; + tokio::task::yield_now().await; StoreReadGuard { guard, _not_send: PhantomData, @@ -1042,10 +1047,10 @@ impl Server { async fn state_mut<'a>(self: &'a Arc) -> StoreWriteGuard<'a> { #[cfg(test)] - async_std::task::yield_now().await; + tokio::task::yield_now().await; let guard = self.store.write().await; #[cfg(test)] - async_std::task::yield_now().await; + tokio::task::yield_now().await; StoreWriteGuard { guard, _not_send: PhantomData, @@ -1083,14 +1088,14 @@ impl<'a> Drop for StoreWriteGuard<'a> { } impl Executor for RealExecutor { - type Timer = Timer; + type Sleep = Sleep; fn spawn_detached>(&self, future: F) { - task::spawn(future); + tokio::task::spawn(future); } - fn timer(&self, duration: Duration) -> Self::Timer { - Timer::after(duration) + fn sleep(&self, duration: Duration) -> Self::Sleep { + tokio::time::sleep(duration) } } @@ -1105,75 +1110,100 @@ where } } -pub fn add_routes(app: &mut tide::Server>, rpc: &Arc) { - let server = Server::new(app.state().clone(), rpc.clone(), None); - app.at("/rpc").get(move |request: Request>| { - let server = server.clone(); - async move { - const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; +lazy_static! { + static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version"); +} - let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade"); - let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket"); - let upgrade_requested = connection_upgrade && upgrade_to_websocket; - let client_protocol_version: Option = request - .header("X-Zed-Protocol-Version") - .and_then(|v| v.as_str().parse().ok()); +pub struct ProtocolVersion(u32); - if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) { - return Ok(Response::new(StatusCode::UpgradeRequired)); - } +impl Header for ProtocolVersion { + fn name() -> &'static HeaderName { + &ZED_PROTOCOL_VERSION + } - let header = match request.header("Sec-Websocket-Key") { - Some(h) => h.as_str(), - None => return Err(anyhow!("expected sec-websocket-key"))?, - }; + fn decode<'i, I>(values: &mut I) -> Result + where + Self: Sized, + I: Iterator, + { + let version = values + .next() + .ok_or_else(|| axum::headers::Error::invalid())? + .to_str() + .map_err(|_| axum::headers::Error::invalid())? + .parse() + .map_err(|_| axum::headers::Error::invalid())?; + Ok(Self(version)) + } - let user_id = process_auth_header(&request).await?; - - let mut response = Response::new(StatusCode::SwitchingProtocols); - response.insert_header(UPGRADE, "websocket"); - response.insert_header(CONNECTION, "Upgrade"); - let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize(); - response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..])); - response.insert_header("Sec-Websocket-Version", "13"); - - let http_res: &mut tide::http::Response = response.as_mut(); - let upgrade_receiver = http_res.recv_upgrade().await; - let addr = request.remote().unwrap_or("unknown").to_string(); - task::spawn(async move { - if let Some(stream) = upgrade_receiver.await { - server - .handle_connection( - Connection::new( - WebSocketStream::from_raw_socket(stream, Role::Server, None).await, - ), - addr, - user_id, - None, - RealExecutor, - ) - .await; - } - }); + fn encode>(&self, values: &mut E) { + values.extend([self.0.to_string().parse().unwrap()]); + } +} - Ok(response) - } - }); +pub fn routes(app_state: Arc) -> Router { + let server = Server::new(app_state.clone(), None); + Router::new() + .route("/rpc", get(handle_websocket_request)) + .layer( + ServiceBuilder::new() + .layer(Extension(app_state)) + .layer(middleware::from_fn(auth::validate_header)) + .layer(Extension(server)), + ) } -fn header_contains_ignore_case( - request: &tide::Request, - header_name: HeaderName, - value: &str, -) -> bool { - request - .header(header_name) - .map(|h| { - h.as_str() - .split(',') - .any(|s| s.trim().eq_ignore_ascii_case(value.trim())) - }) - .unwrap_or(false) +pub async fn handle_websocket_request( + TypedHeader(ProtocolVersion(protocol_version)): TypedHeader, + ConnectInfo(socket_address): ConnectInfo, + Extension(server): Extension>, + Extension(user_id): Extension, + ws: WebSocketUpgrade, +) -> Response { + if protocol_version != rpc::PROTOCOL_VERSION { + return ( + StatusCode::UPGRADE_REQUIRED, + "client must be upgraded".to_string(), + ) + .into_response(); + } + let socket_address = socket_address.to_string(); + ws.on_upgrade(move |socket| { + let socket = socket + .map_ok(to_tungstenite_message) + .err_into() + .with(|message| async move { Ok(to_axum_message(message)) }); + let connection = Connection::new(Box::pin(socket)); + server.handle_connection(connection, socket_address, user_id, None, RealExecutor) + }) +} + +fn to_axum_message(message: TungsteniteMessage) -> AxumMessage { + match message { + TungsteniteMessage::Text(payload) => AxumMessage::Text(payload), + TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload), + TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload), + TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload), + TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame { + code: frame.code.into(), + reason: frame.reason, + })), + } +} + +fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage { + match message { + AxumMessage::Text(payload) => TungsteniteMessage::Text(payload), + AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload), + AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload), + AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload), + AxumMessage::Close(frame) => { + TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame { + code: frame.code.into(), + reason: frame.reason, + })) + } + } } #[cfg(test)] @@ -1181,7 +1211,7 @@ mod tests { use super::*; use crate::{ db::{tests::TestDb, UserId}, - AppState, Config, + AppState, }; use ::rpc::Peer; use client::{ @@ -5621,7 +5651,7 @@ mod tests { let app_state = Self::build_app_state(&test_db).await; let peer = Peer::new(); let notifications = mpsc::unbounded(); - let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0)); + let server = Server::new(app_state.clone(), Some(notifications.0)); Self { peer, app_state, @@ -5736,11 +5766,9 @@ mod tests { } async fn build_app_state(test_db: &TestDb) -> Arc { - let mut config = Config::default(); - config.database_url = test_db.url.clone(); Arc::new(AppState { db: test_db.db().clone(), - config, + api_token: Default::default(), }) } @@ -5752,15 +5780,15 @@ mod tests { where F: FnMut(&Store) -> bool, { - async_std::future::timeout(Duration::from_millis(500), async { - while !(predicate)(&*self.server.store.read().await) { - self.foreground.start_waiting(); - self.notifications.next().await; - self.foreground.finish_waiting(); - } - }) - .await - .expect("condition timed out"); + assert!( + self.foreground.parking_forbidden(), + "you must call forbid_parking to use server conditions so we don't block indefinitely" + ); + while !(predicate)(&*self.server.store.read().await) { + self.foreground.start_waiting(); + self.notifications.next().await; + self.foreground.finish_waiting(); + } } } @@ -6325,13 +6353,13 @@ mod tests { } impl Executor for Arc { - type Timer = gpui::executor::Timer; + type Sleep = gpui::executor::Timer; fn spawn_detached>(&self, future: F) { self.spawn(future).detach(); } - fn timer(&self, duration: Duration) -> Self::Timer { + fn sleep(&self, duration: Duration) -> Self::Sleep { self.as_ref().timer(duration) } } diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 946e9f84209ad3cef8d6e58cd8f54ab6479acee9..7a123ee484664b7d8177c0f81a1c96d99d00f9cd 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -1,5 +1,5 @@ use crate::db::{ChannelId, UserId}; -use anyhow::anyhow; +use anyhow::{anyhow, Result}; use collections::{BTreeMap, HashMap, HashSet}; use rpc::{proto, ConnectionId}; use std::{collections::hash_map, path::PathBuf}; @@ -99,7 +99,7 @@ impl Store { pub fn remove_connection( &mut self, connection_id: ConnectionId, - ) -> tide::Result { + ) -> Result { let connection = if let Some(connection) = self.connections.remove(&connection_id) { connection } else { @@ -165,7 +165,7 @@ impl Store { } } - pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> tide::Result { + pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result { Ok(self .connections .get(&connection_id) @@ -258,7 +258,7 @@ impl Store { worktree_id: u64, connection_id: ConnectionId, worktree: Worktree, - ) -> tide::Result<()> { + ) -> Result<()> { let project = self .projects .get_mut(&project_id) @@ -286,7 +286,7 @@ impl Store { &mut self, project_id: u64, connection_id: ConnectionId, - ) -> tide::Result { + ) -> Result { match self.projects.entry(project_id) { hash_map::Entry::Occupied(e) => { if e.get().host_connection_id == connection_id { @@ -326,7 +326,7 @@ impl Store { project_id: u64, worktree_id: u64, acting_connection_id: ConnectionId, - ) -> tide::Result<(Worktree, Vec)> { + ) -> Result<(Worktree, Vec)> { let project = self .projects .get_mut(&project_id) @@ -363,7 +363,7 @@ impl Store { &mut self, project_id: u64, connection_id: ConnectionId, - ) -> tide::Result { + ) -> Result { if let Some(project) = self.projects.get_mut(&project_id) { if project.host_connection_id == connection_id { let mut share = ProjectShare::default(); @@ -383,7 +383,7 @@ impl Store { &mut self, project_id: u64, acting_connection_id: ConnectionId, - ) -> tide::Result { + ) -> Result { let project = if let Some(project) = self.projects.get_mut(&project_id) { project } else { @@ -418,7 +418,7 @@ impl Store { worktree_id: u64, connection_id: ConnectionId, summary: proto::DiagnosticSummary, - ) -> tide::Result> { + ) -> Result> { let project = self .projects .get_mut(&project_id) @@ -443,7 +443,7 @@ impl Store { project_id: u64, connection_id: ConnectionId, language_server: proto::LanguageServer, - ) -> tide::Result> { + ) -> Result> { let project = self .projects .get_mut(&project_id) @@ -461,7 +461,7 @@ impl Store { connection_id: ConnectionId, user_id: UserId, project_id: u64, - ) -> tide::Result { + ) -> Result { let connection = self .connections .get_mut(&connection_id) @@ -498,7 +498,7 @@ impl Store { &mut self, connection_id: ConnectionId, project_id: u64, - ) -> tide::Result { + ) -> Result { let project = self .projects .get_mut(&project_id) @@ -533,7 +533,7 @@ impl Store { worktree_id: u64, removed_entries: &[u64], updated_entries: &[proto::Entry], - ) -> tide::Result> { + ) -> Result> { let project = self.write_project(project_id, connection_id)?; let worktree = project .share_mut()? @@ -554,13 +554,13 @@ impl Store { &self, project_id: u64, acting_connection_id: ConnectionId, - ) -> tide::Result> { + ) -> Result> { Ok(self .read_project(project_id, acting_connection_id)? .connection_ids()) } - pub fn channel_connection_ids(&self, channel_id: ChannelId) -> tide::Result> { + pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result> { Ok(self .channels .get(&channel_id) @@ -573,11 +573,7 @@ impl Store { self.projects.get(&project_id) } - pub fn read_project( - &self, - project_id: u64, - connection_id: ConnectionId, - ) -> tide::Result<&Project> { + pub fn read_project(&self, project_id: u64, connection_id: ConnectionId) -> Result<&Project> { let project = self .projects .get(&project_id) @@ -600,7 +596,7 @@ impl Store { &mut self, project_id: u64, connection_id: ConnectionId, - ) -> tide::Result<&mut Project> { + ) -> Result<&mut Project> { let project = self .projects .get_mut(&project_id) @@ -755,14 +751,14 @@ impl Project { } } - pub fn share(&self) -> tide::Result<&ProjectShare> { + pub fn share(&self) -> Result<&ProjectShare> { Ok(self .share .as_ref() .ok_or_else(|| anyhow!("worktree is not shared"))?) } - fn share_mut(&mut self) -> tide::Result<&mut ProjectShare> { + fn share_mut(&mut self) -> Result<&mut ProjectShare> { Ok(self .share .as_mut() diff --git a/crates/rpc/src/conn.rs b/crates/rpc/src/conn.rs index 53ba00a3c0e16257783a706b80f7bb832d69a4dc..dfb09a09131e1e3ba732b0f07edf96574b5861a1 100644 --- a/crates/rpc/src/conn.rs +++ b/crates/rpc/src/conn.rs @@ -1,14 +1,14 @@ -use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage}; +use async_tungstenite::tungstenite::Message as WebSocketMessage; use futures::{SinkExt as _, StreamExt as _}; pub struct Connection { pub(crate) tx: - Box>, + Box>, pub(crate) rx: Box< dyn 'static + Send + Unpin - + futures::Stream>, + + futures::Stream>, >, } @@ -18,8 +18,8 @@ impl Connection { S: 'static + Send + Unpin - + futures::Sink - + futures::Stream>, + + futures::Sink + + futures::Stream>, { let (tx, rx) = stream.split(); Self { @@ -28,7 +28,7 @@ impl Connection { } } - pub async fn send(&mut self, message: WebSocketMessage) -> Result<(), WebSocketError> { + pub async fn send(&mut self, message: WebSocketMessage) -> Result<(), anyhow::Error> { self.tx.send(message).await } @@ -54,40 +54,37 @@ impl Connection { killed: Arc, executor: Arc, ) -> ( - Box>, - Box< - dyn Send + Unpin + futures::Stream>, - >, + Box>, + Box>>, ) { + use anyhow::anyhow; use futures::channel::mpsc; use std::io::{Error, ErrorKind}; let (tx, rx) = mpsc::unbounded::(); - let tx = tx - .sink_map_err(|e| WebSocketError::from(Error::new(ErrorKind::Other, e))) - .with({ + let tx = tx.sink_map_err(|error| anyhow!(error)).with({ + let killed = killed.clone(); + let executor = Arc::downgrade(&executor); + move |msg| { let killed = killed.clone(); - let executor = Arc::downgrade(&executor); - move |msg| { - let killed = killed.clone(); - let executor = executor.clone(); - Box::pin(async move { - if let Some(executor) = executor.upgrade() { - executor.simulate_random_delay().await; - } + let executor = executor.clone(); + Box::pin(async move { + if let Some(executor) = executor.upgrade() { + executor.simulate_random_delay().await; + } - // Writes to a half-open TCP connection will error. - if killed.load(SeqCst) { - std::io::Result::Err( - Error::new(ErrorKind::Other, "connection lost").into(), - )?; - } + // Writes to a half-open TCP connection will error. + if killed.load(SeqCst) { + std::io::Result::Err( + Error::new(ErrorKind::Other, "connection lost").into(), + )?; + } - Ok(msg) - }) - } - }); + Ok(msg) + }) + } + }); let rx = rx.then({ let killed = killed.clone(); diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index a9f6b80f8e8d7895d705657d023f2b95a7c073a9..2d3bf639f415334664181ada72832051360ec08c 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -1,6 +1,6 @@ use super::{ConnectionId, PeerId, TypedEnvelope}; -use anyhow::Result; -use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage}; +use anyhow::{anyhow, Result}; +use async_tungstenite::tungstenite::Message as WebSocketMessage; use futures::{SinkExt as _, StreamExt as _}; use prost::Message as _; use std::any::{Any, TypeId}; @@ -318,9 +318,9 @@ impl MessageStream { impl MessageStream where - S: futures::Sink + Unpin, + S: futures::Sink + Unpin, { - pub async fn write(&mut self, message: Message) -> Result<(), WebSocketError> { + pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> { #[cfg(any(test, feature = "test-support"))] const COMPRESSION_LEVEL: i32 = -7; @@ -357,9 +357,9 @@ where impl MessageStream where - S: futures::Stream> + Unpin, + S: futures::Stream> + Unpin, { - pub async fn read(&mut self) -> Result { + pub async fn read(&mut self) -> Result { while let Some(bytes) = self.stream.next().await { match bytes? { WebSocketMessage::Binary(bytes) => { @@ -375,7 +375,7 @@ where _ => {} } } - Err(WebSocketError::ConnectionClosed) + Err(anyhow!("connection closed")) } }