Merge pull request #918 from zed-industries/tokio

Antonio Scandurra created

Switch collab server to Tokio/Axum

Change summary

Cargo.lock                     | 604 +++++++++++++++++++----------------
crates/client/src/client.rs    |  16 
crates/collab/Cargo.toml       |  12 
crates/collab/src/api.rs       | 285 ++++++++--------
crates/collab/src/auth.rs      |  77 ++--
crates/collab/src/db.rs        | 342 ++++++++-----------
crates/collab/src/main.rs      | 112 +++--
crates/collab/src/rpc.rs       | 316 ++++++++++--------
crates/collab/src/rpc/store.rs |  42 +-
crates/rpc/src/conn.rs         |  59 +-
crates/rpc/src/proto.rs        |  14 
11 files changed, 957 insertions(+), 922 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -97,21 +97,6 @@ dependencies = [
  "memchr",
 ]
 
-[[package]]
-name = "alloc-no-stdlib"
-version = "2.0.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5192ec435945d87bc2f70992b4d818154b5feede43c09fb7592146374eac90a6"
-
-[[package]]
-name = "alloc-stdlib"
-version = "0.2.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "697ed7edc0f1711de49ce108c541623a0af97c6c60b2f6e2b65229847ac843c2"
-dependencies = [
- "alloc-no-stdlib",
-]
-
 [[package]]
 name = "ansi_term"
 version = "0.11.0"
@@ -160,16 +145,6 @@ dependencies = [
  "rust-embed",
 ]
 
-[[package]]
-name = "async-attributes"
-version = "1.1.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5"
-dependencies = [
- "quote",
- "syn",
-]
-
 [[package]]
 name = "async-broadcast"
 version = "0.3.4"
@@ -198,22 +173,11 @@ version = "0.3.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "5443ccbb270374a2b1055fc72da40e1f237809cd6bb0e97e66d264cd138473a6"
 dependencies = [
- "brotli",
  "flate2",
  "futures-core",
  "futures-io",
  "memchr",
- "pin-project-lite 0.2.4",
-]
-
-[[package]]
-name = "async-dup"
-version = "1.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7427a12b8dc09291528cfb1da2447059adb4a257388c2acd6497a79d55cf6f7c"
-dependencies = [
- "futures-io",
- "simple-mutex",
+ "pin-project-lite 0.2.8",
 ]
 
 [[package]]
@@ -257,24 +221,6 @@ dependencies = [
  "once_cell",
 ]
 
-[[package]]
-name = "async-h1"
-version = "2.3.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cc5142de15b549749cce62923a50714b0d7b77f5090ced141599e78899865451"
-dependencies = [
- "async-channel",
- "async-dup",
- "async-std",
- "byte-pool",
- "futures-core",
- "http-types",
- "httparse",
- "lazy_static",
- "log",
- "pin-project",
-]
-
 [[package]]
 name = "async-io"
 version = "1.3.1"
@@ -372,48 +318,12 @@ dependencies = [
  "webpki",
 ]
 
-[[package]]
-name = "async-session"
-version = "2.0.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "345022a2eed092cd105cc1b26fd61c341e100bd5fcbbd792df4baf31c2cc631f"
-dependencies = [
- "anyhow",
- "async-std",
- "async-trait",
- "base64 0.12.3",
- "bincode",
- "blake3",
- "chrono",
- "hmac 0.8.1",
- "kv-log-macro",
- "rand 0.7.3",
- "serde",
- "serde_json",
- "sha2 0.9.5",
-]
-
-[[package]]
-name = "async-sse"
-version = "4.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "53bba003996b8fd22245cd0c59b869ba764188ed435392cf2796d03b805ade10"
-dependencies = [
- "async-channel",
- "async-std",
- "http-types",
- "log",
- "memchr",
- "pin-project-lite 0.1.12",
-]
-
 [[package]]
 name = "async-std"
 version = "1.9.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "d9f06685bad74e0570f5213741bea82158279a4103d988e57bfada11ad230341"
 dependencies = [
- "async-attributes",
  "async-channel",
  "async-global-executor",
  "async-io",
@@ -430,7 +340,7 @@ dependencies = [
  "memchr",
  "num_cpus",
  "once_cell",
- "pin-project-lite 0.2.4",
+ "pin-project-lite 0.2.8",
  "pin-utils",
  "slab",
  "wasm-bindgen-futures",
@@ -475,8 +385,8 @@ dependencies = [
  "futures-io",
  "futures-util",
  "log",
- "pin-project-lite 0.2.4",
- "tungstenite",
+ "pin-project-lite 0.2.8",
+ "tungstenite 0.16.0",
 ]
 
 [[package]]
@@ -545,6 +455,55 @@ version = "1.0.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
 
+[[package]]
+name = "axum"
+version = "0.5.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f523b4e98ba6897ae90994bc18423d9877c54f9047b06a00ddc8122a957b1c70"
+dependencies = [
+ "async-trait",
+ "axum-core",
+ "base64 0.13.0",
+ "bitflags",
+ "bytes 1.0.1",
+ "futures-util",
+ "headers",
+ "http",
+ "http-body",
+ "hyper",
+ "itoa 1.0.1",
+ "matchit",
+ "memchr",
+ "mime",
+ "percent-encoding",
+ "pin-project-lite 0.2.8",
+ "serde",
+ "serde_json",
+ "serde_urlencoded",
+ "sha-1 0.10.0",
+ "sync_wrapper",
+ "tokio",
+ "tokio-tungstenite",
+ "tower",
+ "tower-http",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
+name = "axum-core"
+version = "0.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d3ddbd16eabff8b45f21b98671fddcc93daaa7ac4c84f8473693437226040de5"
+dependencies = [
+ "async-trait",
+ "bytes 1.0.1",
+ "futures-util",
+ "http",
+ "http-body",
+ "mime",
+]
+
 [[package]]
 name = "backtrace"
 version = "0.3.64"
@@ -618,9 +577,9 @@ dependencies = [
 
 [[package]]
 name = "bitflags"
-version = "1.2.1"
+version = "1.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
+checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
 
 [[package]]
 name = "bitvec"
@@ -634,21 +593,6 @@ dependencies = [
  "wyz",
 ]
 
-[[package]]
-name = "blake3"
-version = "0.3.8"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b64485778c4f16a6a5a9d335e80d449ac6c70cdd6a06d2af18a6f6f775a125b3"
-dependencies = [
- "arrayref",
- "arrayvec 0.5.2",
- "cc",
- "cfg-if 0.1.10",
- "constant_time_eq",
- "crypto-mac 0.8.0",
- "digest 0.9.0",
-]
-
 [[package]]
 name = "block"
 version = "0.1.6"
@@ -702,27 +646,6 @@ dependencies = [
  "workspace",
 ]
 
-[[package]]
-name = "brotli"
-version = "3.3.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7f29919120f08613aadcd4383764e00526fc9f18b6c0895814faeed0dd78613e"
-dependencies = [
- "alloc-no-stdlib",
- "alloc-stdlib",
- "brotli-decompressor",
-]
-
-[[package]]
-name = "brotli-decompressor"
-version = "2.3.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1052e1c3b8d4d80eb84a8b94f0a1498797b5fb96314c001156a1c761940ef4ec"
-dependencies = [
- "alloc-no-stdlib",
- "alloc-stdlib",
-]
-
 [[package]]
 name = "bstr"
 version = "0.2.15"
@@ -744,16 +667,6 @@ version = "3.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631"
 
-[[package]]
-name = "byte-pool"
-version = "0.2.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f8c7230ddbb427b1094d477d821a99f3f54d36333178eeb806e279bcdcecf0ca"
-dependencies = [
- "crossbeam-queue",
- "stable_deref_trait",
-]
-
 [[package]]
 name = "bytemuck"
 version = "1.5.1"
@@ -838,7 +751,6 @@ dependencies = [
  "libc",
  "num-integer",
  "num-traits",
- "serde",
  "time 0.1.44",
  "winapi 0.3.9",
 ]
@@ -1012,10 +924,9 @@ name = "collab"
 version = "0.1.0"
 dependencies = [
  "anyhow",
- "async-io",
- "async-std",
  "async-trait",
  "async-tungstenite",
+ "axum",
  "base64 0.13.0",
  "client",
  "collections",
@@ -1039,14 +950,14 @@ dependencies = [
  "serde",
  "serde_json",
  "settings",
- "sha-1",
+ "sha-1 0.9.6",
  "sqlx",
- "surf",
  "theme",
- "tide",
- "tide-compress",
  "time 0.2.27",
+ "tokio",
+ "tokio-tungstenite",
  "toml",
+ "tower",
  "util",
  "workspace",
 ]
@@ -1096,12 +1007,6 @@ version = "0.4.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "f92cfa0fd5690b3cf8c1ef2cabbd9b7ef22fa53cf5e1f92b05103f6d5d1cf6e7"
 
-[[package]]
-name = "constant_time_eq"
-version = "0.1.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc"
-
 [[package]]
 name = "contacts_panel"
 version = "0.1.0"
@@ -1319,16 +1224,6 @@ dependencies = [
  "typenum",
 ]
 
-[[package]]
-name = "crypto-mac"
-version = "0.8.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b584a330336237c1eecd3e94266efb216c56ed91225d634cb2991c5f3fd1aeab"
-dependencies = [
- "generic-array",
- "subtle",
-]
-
 [[package]]
 name = "crypto-mac"
 version = "0.10.0"
@@ -1713,22 +1608,6 @@ dependencies = [
  "instant",
 ]
 
-[[package]]
-name = "femme"
-version = "2.1.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2af1a24f391a5a94d756db5092c6576aad494b88a71a5a36b20c67b63e0df034"
-dependencies = [
- "cfg-if 0.1.10",
- "js-sys",
- "log",
- "serde",
- "serde_derive",
- "serde_json",
- "wasm-bindgen",
- "web-sys",
-]
-
 [[package]]
 name = "file_finder"
 version = "0.1.0"
@@ -1940,9 +1819,9 @@ dependencies = [
 
 [[package]]
 name = "futures-channel"
-version = "0.3.12"
+version = "0.3.21"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f2d31b7ec7efab6eefc7c57233bb10b847986139d88cc2f5a02a1ae6871a1846"
+checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"
 dependencies = [
  "futures-core",
  "futures-sink",
@@ -1967,9 +1846,9 @@ dependencies = [
 
 [[package]]
 name = "futures-io"
-version = "0.3.12"
+version = "0.3.21"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "28be053525281ad8259d47e4de5de657b25e7bac113458555bb4b70bc6870500"
+checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
 
 [[package]]
 name = "futures-lite"
@@ -1982,17 +1861,16 @@ dependencies = [
  "futures-io",
  "memchr",
  "parking",
- "pin-project-lite 0.2.4",
+ "pin-project-lite 0.2.8",
  "waker-fn",
 ]
 
 [[package]]
 name = "futures-macro"
-version = "0.3.12"
+version = "0.3.21"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c287d25add322d9f9abdcdc5927ca398917996600182178774032e9f8258fedd"
+checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512"
 dependencies = [
- "proc-macro-hack",
  "proc-macro2",
  "quote",
  "syn",
@@ -2000,21 +1878,21 @@ dependencies = [
 
 [[package]]
 name = "futures-sink"
-version = "0.3.14"
+version = "0.3.21"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5c5629433c555de3d82861a7a4e3794a4c40040390907cfbfd7143a92a426c23"
+checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868"
 
 [[package]]
 name = "futures-task"
-version = "0.3.14"
+version = "0.3.21"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ba7aa51095076f3ba6d9a1f702f74bd05ec65f555d70d2033d55ba8d69f581bc"
+checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a"
 
 [[package]]
 name = "futures-util"
-version = "0.3.12"
+version = "0.3.21"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "632a8cd0f2a4b3fdea1657f08bde063848c3bd00f9bbf6e256b8be78802e624b"
+checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
 dependencies = [
  "futures-channel",
  "futures-core",
@@ -2023,10 +1901,8 @@ dependencies = [
  "futures-sink",
  "futures-task",
  "memchr",
- "pin-project-lite 0.2.4",
+ "pin-project-lite 0.2.8",
  "pin-utils",
- "proc-macro-hack",
- "proc-macro-nested",
  "slab",
 ]
 
@@ -2239,6 +2115,31 @@ dependencies = [
  "hashbrown 0.11.2",
 ]
 
+[[package]]
+name = "headers"
+version = "0.3.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4cff78e5788be1e0ab65b04d306b2ed5092c815ec97ec70f4ebd5aee158aa55d"
+dependencies = [
+ "base64 0.13.0",
+ "bitflags",
+ "bytes 1.0.1",
+ "headers-core",
+ "http",
+ "httpdate",
+ "mime",
+ "sha-1 0.10.0",
+]
+
+[[package]]
+name = "headers-core"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
+dependencies = [
+ "http",
+]
+
 [[package]]
 name = "heck"
 version = "0.3.3"
@@ -2279,16 +2180,6 @@ dependencies = [
  "hmac 0.10.1",
 ]
 
-[[package]]
-name = "hmac"
-version = "0.8.1"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "126888268dcc288495a26bf004b38c5fdbb31682f992c84ceb046a1f0fe38840"
-dependencies = [
- "crypto-mac 0.8.0",
- "digest 0.9.0",
-]
-
 [[package]]
 name = "hmac"
 version = "0.10.1"
@@ -2311,13 +2202,13 @@ dependencies = [
 
 [[package]]
 name = "http"
-version = "0.2.4"
+version = "0.2.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11"
+checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03"
 dependencies = [
  "bytes 1.0.1",
  "fnv",
- "itoa 0.4.7",
+ "itoa 1.0.1",
 ]
 
 [[package]]
@@ -2329,6 +2220,17 @@ dependencies = [
  "base64 0.12.3",
 ]
 
+[[package]]
+name = "http-body"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6"
+dependencies = [
+ "bytes 1.0.1",
+ "http",
+ "pin-project-lite 0.2.8",
+]
+
 [[package]]
 name = "http-client"
 version = "6.4.1"
@@ -2344,6 +2246,12 @@ dependencies = [
  "log",
 ]
 
+[[package]]
+name = "http-range-header"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
+
 [[package]]
 name = "http-types"
 version = "2.11.1"
@@ -2357,7 +2265,7 @@ dependencies = [
  "cookie",
  "futures-lite",
  "infer",
- "pin-project-lite 0.2.4",
+ "pin-project-lite 0.2.8",
  "rand 0.7.3",
  "serde",
  "serde_json",
@@ -2368,9 +2276,15 @@ dependencies = [
 
 [[package]]
 name = "httparse"
-version = "1.4.1"
+version = "1.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f3a87b616e37e93c22fb19bcd386f02f3af5ea98a25670ad0fce773de23c5e68"
+checksum = "6330e8a36bd8c859f3fa6d9382911fbb7147ec39807f63b923933a247240b9ba"
+
+[[package]]
+name = "httpdate"
+version = "1.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
 
 [[package]]
 name = "humantime"
@@ -2378,6 +2292,29 @@ version = "2.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
 
+[[package]]
+name = "hyper"
+version = "0.14.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2"
+dependencies = [
+ "bytes 1.0.1",
+ "futures-channel",
+ "futures-core",
+ "futures-util",
+ "http",
+ "http-body",
+ "httparse",
+ "httpdate",
+ "itoa 1.0.1",
+ "pin-project-lite 0.2.8",
+ "socket2 0.4.0",
+ "tokio",
+ "tower-service",
+ "tracing",
+ "want",
+]
+
 [[package]]
 name = "idna"
 version = "0.2.3"
@@ -2480,7 +2417,7 @@ dependencies = [
  "fnv",
  "lazy_static",
  "libc",
- "mio",
+ "mio 0.6.23",
  "rand 0.7.3",
  "serde",
  "tempfile",
@@ -2844,6 +2781,12 @@ version = "0.1.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
 
+[[package]]
+name = "matchit"
+version = "0.5.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "73cbba799671b762df5a175adf59ce145165747bb891505c43d09aefbbf38beb"
+
 [[package]]
 name = "maybe-uninit"
 version = "2.0.0"
@@ -2956,12 +2899,25 @@ dependencies = [
  "kernel32-sys",
  "libc",
  "log",
- "miow",
+ "miow 0.2.2",
  "net2",
  "slab",
  "winapi 0.2.8",
 ]
 
+[[package]]
+name = "mio"
+version = "0.7.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8067b404fe97c70829f082dec8bcf4f71225d7eaea1d8645349cb76fa06205cc"
+dependencies = [
+ "libc",
+ "log",
+ "miow 0.3.7",
+ "ntapi",
+ "winapi 0.3.9",
+]
+
 [[package]]
 name = "miow"
 version = "0.2.2"
@@ -2974,6 +2930,15 @@ dependencies = [
  "ws2_32-sys",
 ]
 
+[[package]]
+name = "miow"
+version = "0.3.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21"
+dependencies = [
+ "winapi 0.3.9",
+]
+
 [[package]]
 name = "multimap"
 version = "0.8.3"
@@ -3024,6 +2989,15 @@ dependencies = [
  "version_check",
 ]
 
+[[package]]
+name = "ntapi"
+version = "0.3.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f"
+dependencies = [
+ "winapi 0.3.9",
+]
+
 [[package]]
 name = "num-bigint"
 version = "0.4.0"
@@ -3369,9 +3343,9 @@ checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777"
 
 [[package]]
 name = "pin-project-lite"
-version = "0.2.4"
+version = "0.2.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "439697af366c49a6d0a010c56a0d97685bc140ce0d377b13a2ea2aa42d64a827"
+checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c"
 
 [[package]]
 name = "pin-utils"
@@ -3493,12 +3467,6 @@ version = "0.5.19"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
 
-[[package]]
-name = "proc-macro-nested"
-version = "0.1.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
-
 [[package]]
 name = "proc-macro2"
 version = "1.0.36"
@@ -3883,12 +3851,6 @@ dependencies = [
  "winapi 0.3.9",
 ]
 
-[[package]]
-name = "route-recognizer"
-version = "0.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "56770675ebc04927ded3e60633437841581c285dc6236109ea25fbf3beb7b59e"
-
 [[package]]
 name = "roxmltree"
 version = "0.14.1"
@@ -4334,6 +4296,17 @@ dependencies = [
  "opaque-debug",
 ]
 
+[[package]]
+name = "sha-1"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "028f48d513f9678cda28f6e4064755b3fbb2af6acd672f2c209b62323f7aea0f"
+dependencies = [
+ "cfg-if 1.0.0",
+ "cpufeatures 0.2.1",
+ "digest 0.10.3",
+]
+
 [[package]]
 name = "sha1"
 version = "0.6.0"
@@ -4395,15 +4368,6 @@ version = "1.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "1ad1d488a557b235fc46dae55512ffbfc429d2482b08b4d9435ab07384ca8aec"
 
-[[package]]
-name = "simple-mutex"
-version = "1.1.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "38aabbeafa6f6dead8cebf246fe9fae1f9215c8d29b3a69f93bd62a9e4a3dcd6"
-dependencies = [
- "event-listener",
-]
-
 [[package]]
 name = "simple_asn1"
 version = "0.5.3"
@@ -4596,7 +4560,7 @@ dependencies = [
  "rustls",
  "serde",
  "serde_json",
- "sha-1",
+ "sha-1 0.9.6",
  "sha2 0.9.5",
  "smallvec",
  "sqlformat",
@@ -4641,12 +4605,6 @@ dependencies = [
  "async-std",
 ]
 
-[[package]]
-name = "stable_deref_trait"
-version = "1.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
-
 [[package]]
 name = "standback"
 version = "0.2.17"
@@ -4766,7 +4724,7 @@ dependencies = [
  "log",
  "mime_guess",
  "once_cell",
- "pin-project-lite 0.2.4",
+ "pin-project-lite 0.2.8",
  "serde",
  "serde_json",
  "web-sys",
@@ -4818,6 +4776,12 @@ dependencies = [
  "unicode-xid",
 ]
 
+[[package]]
+name = "sync_wrapper"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
+
 [[package]]
 name = "synstructure"
 version = "0.12.4"
@@ -4971,41 +4935,6 @@ dependencies = [
  "once_cell",
 ]
 
-[[package]]
-name = "tide"
-version = "0.16.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c459573f0dd2cc734b539047f57489ea875af8ee950860ded20cf93a79a1dee0"
-dependencies = [
- "async-h1",
- "async-session",
- "async-sse",
- "async-std",
- "async-trait",
- "femme",
- "futures-util",
- "http-client",
- "http-types",
- "kv-log-macro",
- "log",
- "pin-project-lite 0.2.4",
- "route-recognizer",
- "serde",
- "serde_json",
-]
-
-[[package]]
-name = "tide-compress"
-version = "0.9.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1d59e3885ecbc547a611d81e501b51bb5f52abd44c3eb3b733ac3c44ff2f2619"
-dependencies = [
- "async-compression",
- "futures-lite",
- "http-types",
- "tide",
-]
-
 [[package]]
 name = "tiff"
 version = "0.6.1"
@@ -5119,6 +5048,61 @@ version = "0.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
 
+[[package]]
+name = "tokio"
+version = "1.16.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0c27a64b625de6d309e8c57716ba93021dccf1b3b5c97edd6d3dd2d2135afc0a"
+dependencies = [
+ "bytes 1.0.1",
+ "libc",
+ "memchr",
+ "mio 0.7.14",
+ "num_cpus",
+ "once_cell",
+ "parking_lot",
+ "pin-project-lite 0.2.8",
+ "signal-hook-registry",
+ "tokio-macros",
+ "winapi 0.3.9",
+]
+
+[[package]]
+name = "tokio-macros"
+version = "1.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "tokio-tungstenite"
+version = "0.17.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "06cda1232a49558c46f8a504d5b93101d42c0bf7f911f12a105ba48168f821ae"
+dependencies = [
+ "futures-util",
+ "log",
+ "tokio",
+ "tungstenite 0.17.2",
+]
+
+[[package]]
+name = "tokio-util"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0edfdeb067411dba2044da6d1cb2df793dd35add7888d73c16e3381ded401764"
+dependencies = [
+ "bytes 1.0.1",
+ "futures-core",
+ "futures-sink",
+ "pin-project-lite 0.2.8",
+ "tokio",
+]
+
 [[package]]
 name = "toml"
 version = "0.5.8"
@@ -5128,6 +5112,54 @@ dependencies = [
  "serde",
 ]
 
+[[package]]
+name = "tower"
+version = "0.4.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9a89fd63ad6adf737582df5db40d286574513c69a11dac5214dc3b5603d6713e"
+dependencies = [
+ "futures-core",
+ "futures-util",
+ "pin-project",
+ "pin-project-lite 0.2.8",
+ "tokio",
+ "tokio-util",
+ "tower-layer",
+ "tower-service",
+ "tracing",
+]
+
+[[package]]
+name = "tower-http"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "aba3f3efabf7fb41fae8534fc20a817013dd1c12cb45441efb6c82e6556b4cd8"
+dependencies = [
+ "bitflags",
+ "bytes 1.0.1",
+ "futures-core",
+ "futures-util",
+ "http",
+ "http-body",
+ "http-range-header",
+ "pin-project-lite 0.2.8",
+ "tower",
+ "tower-layer",
+ "tower-service",
+]
+
+[[package]]
+name = "tower-layer"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62"
+
+[[package]]
+name = "tower-service"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
+
 [[package]]
 name = "tracing"
 version = "0.1.26"
@@ -5136,7 +5168,7 @@ checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d"
 dependencies = [
  "cfg-if 1.0.0",
  "log",
- "pin-project-lite 0.2.4",
+ "pin-project-lite 0.2.8",
  "tracing-attributes",
  "tracing-core",
 ]
@@ -5248,6 +5280,12 @@ dependencies = [
  "tree-sitter",
 ]
 
+[[package]]
+name = "try-lock"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
+
 [[package]]
 name = "ttf-parser"
 version = "0.9.0"

crates/client/src/client.rs 🔗

@@ -11,7 +11,7 @@ use async_tungstenite::tungstenite::{
     error::Error as WebsocketError,
     http::{Request, StatusCode},
 };
-use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
+use futures::{future::LocalBoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
 use gpui::{
     actions, AnyModelHandle, AnyViewHandle, AnyWeakModelHandle, AnyWeakViewHandle, AsyncAppContext,
     Entity, ModelContext, ModelHandle, MutableAppContext, Task, View, ViewContext, ViewHandle,
@@ -774,7 +774,7 @@ impl Client {
                 "Authorization",
                 format!("{} {}", credentials.user_id, credentials.access_token),
             )
-            .header("X-Zed-Protocol-Version", rpc::PROTOCOL_VERSION);
+            .header("x-zed-protocol-version", rpc::PROTOCOL_VERSION);
 
         let http = self.http.clone();
         cx.background().spawn(async move {
@@ -817,13 +817,21 @@ impl Client {
                     let request = request.uri(rpc_url.as_str()).body(())?;
                     let (stream, _) =
                         async_tungstenite::async_tls::client_async_tls(request, stream).await?;
-                    Ok(Connection::new(stream))
+                    Ok(Connection::new(
+                        stream
+                            .map_err(|error| anyhow!(error))
+                            .sink_map_err(|error| anyhow!(error)),
+                    ))
                 }
                 "http" => {
                     rpc_url.set_scheme("ws").unwrap();
                     let request = request.uri(rpc_url.as_str()).body(())?;
                     let (stream, _) = async_tungstenite::client_async(request, stream).await?;
-                    Ok(Connection::new(stream))
+                    Ok(Connection::new(
+                        stream
+                            .map_err(|error| anyhow!(error))
+                            .sink_map_err(|error| anyhow!(error)),
+                    ))
                 }
                 _ => Err(anyhow!("invalid rpc url: {}", rpc_url))?,
             }

crates/collab/Cargo.toml 🔗

@@ -16,15 +16,17 @@ required-features = ["seed-support"]
 collections = { path = "../collections" }
 rpc = { path = "../rpc" }
 util = { path = "../util" }
+
 anyhow = "1.0.40"
-async-io = "1.3"
-async-std = { version = "1.8.0", features = ["attributes"] }
 async-trait = "0.1.50"
 async-tungstenite = "0.16"
+axum = { version = "0.5", features = ["json", "headers", "ws"] }
 base64 = "0.13"
 envy = "0.4.2"
+env_logger = "0.8"
 futures = "0.3"
 json_env_logger = "0.1"
+lazy_static = "1.4"
 lipsum = { version = "0.8", optional = true }
 log = { version = "0.4.16", features = ["kv_unstable_serde"] }
 parking_lot = "0.11.1"
@@ -33,9 +35,9 @@ scrypt = "0.7"
 serde = { version = "1.0", features = ["derive"] }
 serde_json = "1.0"
 sha-1 = "0.9"
-surf = "2.2.0"
-tide = "0.16.0"
-tide-compress = "0.9.0"
+tokio = { version = "1", features = ["full"] }
+tokio-tungstenite = "0.17"
+tower = "0.4"
 time = "0.2"
 toml = "0.5.8"
 

crates/collab/src/api.rs 🔗

@@ -1,179 +1,184 @@
-use crate::{auth, db::UserId, AppState, Request, RequestExt as _};
-use async_trait::async_trait;
-use serde::Deserialize;
-use serde_json::json;
+use crate::{
+    auth,
+    db::{User, UserId},
+    AppState, Error, Result,
+};
+use anyhow::anyhow;
+use axum::{
+    body::Body,
+    extract::{Path, Query},
+    http::{self, Request, StatusCode},
+    middleware::{self, Next},
+    response::IntoResponse,
+    routing::{get, post, put},
+    Extension, Json, Router,
+};
+use serde::{Deserialize, Serialize};
 use std::sync::Arc;
-use surf::StatusCode;
-
-pub fn add_routes(app: &mut tide::Server<Arc<AppState>>) {
-    app.at("/users").get(get_users);
-    app.at("/users").post(create_user);
-    app.at("/users/:id").put(update_user);
-    app.at("/users/:id").delete(destroy_user);
-    app.at("/users/:github_login").get(get_user);
-    app.at("/users/:github_login/access_tokens")
-        .post(create_access_token);
+use tower::ServiceBuilder;
+
+pub fn routes(state: Arc<AppState>) -> Router<Body> {
+    Router::new()
+        .route("/users", get(get_users).post(create_user))
+        .route(
+            "/users/:id",
+            put(update_user).delete(destroy_user).get(get_user),
+        )
+        .route("/users/:id/access_tokens", post(create_access_token))
+        .layer(
+            ServiceBuilder::new()
+                .layer(Extension(state))
+                .layer(middleware::from_fn(validate_api_token)),
+        )
+    // TODO: Compression on API routes?
 }
 
-async fn get_user(request: Request) -> tide::Result {
-    request.require_token().await?;
-
-    let user = request
-        .db()
-        .get_user_by_github_login(request.param("github_login")?)
-        .await?
-        .ok_or_else(|| surf::Error::from_str(404, "user not found"))?;
+pub async fn validate_api_token<B>(req: Request<B>, next: Next<B>) -> impl IntoResponse {
+    let token = req
+        .headers()
+        .get(http::header::AUTHORIZATION)
+        .and_then(|header| header.to_str().ok())
+        .ok_or_else(|| {
+            Error::Http(
+                StatusCode::BAD_REQUEST,
+                "missing authorization header".to_string(),
+            )
+        })?
+        .strip_prefix("token ")
+        .ok_or_else(|| {
+            Error::Http(
+                StatusCode::BAD_REQUEST,
+                "invalid authorization header".to_string(),
+            )
+        })?;
+
+    let state = req.extensions().get::<Arc<AppState>>().unwrap();
+
+    if token != state.api_token {
+        Err(Error::Http(
+            StatusCode::UNAUTHORIZED,
+            "invalid authorization token".to_string(),
+        ))?
+    }
 
-    Ok(tide::Response::builder(StatusCode::Ok)
-        .body(tide::Body::from_json(&user)?)
-        .build())
+    Ok::<_, Error>(next.run(req).await)
 }
 
-async fn get_users(request: Request) -> tide::Result {
-    request.require_token().await?;
-
-    let users = request.db().get_all_users().await?;
-
-    Ok(tide::Response::builder(StatusCode::Ok)
-        .body(tide::Body::from_json(&users)?)
-        .build())
+async fn get_users(Extension(app): Extension<Arc<AppState>>) -> Result<Json<Vec<User>>> {
+    let users = app.db.get_all_users().await?;
+    Ok(Json(users))
 }
 
-async fn create_user(mut request: Request) -> tide::Result {
-    request.require_token().await?;
-
-    #[derive(Deserialize)]
-    struct Params {
-        github_login: String,
-        admin: bool,
-    }
-    let params = request.body_json::<Params>().await?;
+#[derive(Deserialize)]
+struct CreateUserParams {
+    github_login: String,
+    admin: bool,
+}
 
-    let user_id = request
-        .db()
+async fn create_user(
+    Json(params): Json<CreateUserParams>,
+    Extension(app): Extension<Arc<AppState>>,
+) -> Result<Json<User>> {
+    let user_id = app
+        .db
         .create_user(&params.github_login, params.admin)
         .await?;
 
-    let user = request.db().get_user_by_id(user_id).await?.ok_or_else(|| {
-        surf::Error::from_str(
-            StatusCode::InternalServerError,
-            "couldn't find the user we just created",
-        )
-    })?;
+    let user = app
+        .db
+        .get_user_by_id(user_id)
+        .await?
+        .ok_or_else(|| anyhow!("couldn't find the user we just created"))?;
 
-    Ok(tide::Response::builder(StatusCode::Ok)
-        .body(tide::Body::from_json(&user)?)
-        .build())
+    Ok(Json(user))
 }
 
-async fn update_user(mut request: Request) -> tide::Result {
-    request.require_token().await?;
+#[derive(Deserialize)]
+struct UpdateUserParams {
+    admin: bool,
+}
 
-    #[derive(Deserialize)]
-    struct Params {
-        admin: bool,
-    }
-    let user_id = UserId(
-        request
-            .param("id")?
-            .parse::<i32>()
-            .map_err(|error| surf::Error::from_str(StatusCode::BadRequest, error.to_string()))?,
-    );
-    let params = request.body_json::<Params>().await?;
-
-    request
-        .db()
-        .set_user_is_admin(user_id, params.admin)
+async fn update_user(
+    Path(user_id): Path<i32>,
+    Json(params): Json<UpdateUserParams>,
+    Extension(app): Extension<Arc<AppState>>,
+) -> Result<()> {
+    app.db
+        .set_user_is_admin(UserId(user_id), params.admin)
         .await?;
+    Ok(())
+}
 
-    Ok(tide::Response::builder(StatusCode::Ok).build())
+async fn destroy_user(
+    Path(user_id): Path<i32>,
+    Extension(app): Extension<Arc<AppState>>,
+) -> Result<()> {
+    app.db.destroy_user(UserId(user_id)).await?;
+    Ok(())
 }
 
-async fn destroy_user(request: Request) -> tide::Result {
-    request.require_token().await?;
-    let user_id = UserId(
-        request
-            .param("id")?
-            .parse::<i32>()
-            .map_err(|error| surf::Error::from_str(StatusCode::BadRequest, error.to_string()))?,
-    );
+async fn get_user(
+    Path(login): Path<String>,
+    Extension(app): Extension<Arc<AppState>>,
+) -> Result<Json<User>> {
+    let user = app
+        .db
+        .get_user_by_github_login(&login)
+        .await?
+        .ok_or_else(|| anyhow!("user not found"))?;
+    Ok(Json(user))
+}
 
-    request.db().destroy_user(user_id).await?;
+#[derive(Deserialize)]
+struct CreateAccessTokenQueryParams {
+    public_key: String,
+    impersonate: Option<String>,
+}
 
-    Ok(tide::Response::builder(StatusCode::Ok).build())
+#[derive(Serialize)]
+struct CreateAccessTokenResponse {
+    user_id: UserId,
+    encrypted_access_token: String,
 }
 
-async fn create_access_token(request: Request) -> tide::Result {
-    request.require_token().await?;
+async fn create_access_token(
+    Path(login): Path<String>,
+    Query(params): Query<CreateAccessTokenQueryParams>,
+    Extension(app): Extension<Arc<AppState>>,
+) -> Result<Json<CreateAccessTokenResponse>> {
+    //     request.require_token().await?;
 
-    let user = request
-        .db()
-        .get_user_by_github_login(request.param("github_login")?)
+    let user = app
+        .db
+        .get_user_by_github_login(&login)
         .await?
-        .ok_or_else(|| surf::Error::from_str(StatusCode::NotFound, "user not found"))?;
-
-    #[derive(Deserialize)]
-    struct QueryParams {
-        public_key: String,
-        impersonate: Option<String>,
-    }
-
-    let query_params: QueryParams = request.query().map_err(|_| {
-        surf::Error::from_str(StatusCode::UnprocessableEntity, "invalid query params")
-    })?;
+        .ok_or_else(|| anyhow!("user not found"))?;
 
     let mut user_id = user.id;
-    if let Some(impersonate) = query_params.impersonate {
+    if let Some(impersonate) = params.impersonate {
         if user.admin {
-            if let Some(impersonated_user) =
-                request.db().get_user_by_github_login(&impersonate).await?
-            {
+            if let Some(impersonated_user) = app.db.get_user_by_github_login(&impersonate).await? {
                 user_id = impersonated_user.id;
             } else {
-                return Ok(tide::Response::builder(StatusCode::UnprocessableEntity)
-                    .body(format!(
-                        "Can't impersonate non-existent user {}",
-                        impersonate
-                    ))
-                    .build());
+                return Err(Error::Http(
+                    StatusCode::UNPROCESSABLE_ENTITY,
+                    format!("user {impersonate} does not exist"),
+                ));
             }
         } else {
-            return Ok(tide::Response::builder(StatusCode::Unauthorized)
-                .body(format!(
-                    "Can't impersonate user {} because the real user isn't an admin",
-                    impersonate
-                ))
-                .build());
+            return Err(Error::Http(
+                StatusCode::UNAUTHORIZED,
+                format!("you do not have permission to impersonate other users"),
+            ));
         }
     }
 
-    let access_token = auth::create_access_token(request.db().as_ref(), user_id).await?;
+    let access_token = auth::create_access_token(app.db.as_ref(), user_id).await?;
     let encrypted_access_token =
-        auth::encrypt_access_token(&access_token, query_params.public_key.clone())?;
+        auth::encrypt_access_token(&access_token, params.public_key.clone())?;
 
-    Ok(tide::Response::builder(StatusCode::Ok)
-        .body(json!({"user_id": user_id, "encrypted_access_token": encrypted_access_token}))
-        .build())
-}
-
-#[async_trait]
-pub trait RequestExt {
-    async fn require_token(&self) -> tide::Result<()>;
-}
-
-#[async_trait]
-impl RequestExt for Request {
-    async fn require_token(&self) -> tide::Result<()> {
-        let token = self
-            .header("Authorization")
-            .and_then(|header| header.get(0))
-            .and_then(|header| header.as_str().strip_prefix("token "))
-            .ok_or_else(|| surf::Error::from_str(403, "invalid authorization header"))?;
-
-        if token == self.state().config.api_token {
-            Ok(())
-        } else {
-            Err(tide::Error::from_str(403, "invalid authorization token"))
-        }
-    }
+    Ok(Json(CreateAccessTokenResponse {
+        user_id,
+        encrypted_access_token,
+    }))
 }

crates/collab/src/auth.rs 🔗

@@ -1,45 +1,47 @@
-use super::{
-    db::{self, UserId},
-    errors::TideResultExt,
+use std::sync::Arc;
+
+use super::db::{self, UserId};
+use crate::{AppState, Error};
+use anyhow::{Context, Result};
+use axum::{
+    http::{self, Request, StatusCode},
+    middleware::Next,
+    response::IntoResponse,
 };
-use crate::Request;
-use anyhow::{anyhow, Context};
 use rand::thread_rng;
-use rpc::auth as zed_auth;
 use scrypt::{
     password_hash::{PasswordHash, PasswordHasher, PasswordVerifier, SaltString},
     Scrypt,
 };
-use std::convert::TryFrom;
-use surf::StatusCode;
-use tide::Error;
 
-pub async fn process_auth_header(request: &Request) -> tide::Result<UserId> {
-    let mut auth_header = request
-        .header("Authorization")
+pub async fn validate_header<B>(mut req: Request<B>, next: Next<B>) -> impl IntoResponse {
+    let mut auth_header = req
+        .headers()
+        .get(http::header::AUTHORIZATION)
+        .and_then(|header| header.to_str().ok())
         .ok_or_else(|| {
-            Error::new(
-                StatusCode::BadRequest,
-                anyhow!("missing authorization header"),
+            Error::Http(
+                StatusCode::BAD_REQUEST,
+                "missing authorization header".to_string(),
             )
         })?
-        .last()
-        .as_str()
         .split_whitespace();
+
     let user_id = UserId(auth_header.next().unwrap_or("").parse().map_err(|_| {
-        Error::new(
-            StatusCode::BadRequest,
-            anyhow!("missing user id in authorization header"),
+        Error::Http(
+            StatusCode::BAD_REQUEST,
+            "missing user id in authorization header".to_string(),
         )
     })?);
+
     let access_token = auth_header.next().ok_or_else(|| {
-        Error::new(
-            StatusCode::BadRequest,
-            anyhow!("missing access token in authorization header"),
+        Error::Http(
+            StatusCode::BAD_REQUEST,
+            "missing access token in authorization header".to_string(),
         )
     })?;
 
-    let state = request.state().clone();
+    let state = req.extensions().get::<Arc<AppState>>().unwrap();
     let mut credentials_valid = false;
     for password_hash in state.db.get_access_token_hashes(user_id).await? {
         if verify_access_token(&access_token, &password_hash)? {
@@ -48,20 +50,21 @@ pub async fn process_auth_header(request: &Request) -> tide::Result<UserId> {
         }
     }
 
-    if !credentials_valid {
-        Err(Error::new(
-            StatusCode::Unauthorized,
-            anyhow!("invalid credentials"),
-        ))?;
+    if credentials_valid {
+        req.extensions_mut().insert(user_id);
+        Ok::<_, Error>(next.run(req).await)
+    } else {
+        Err(Error::Http(
+            StatusCode::UNAUTHORIZED,
+            "invalid credentials".to_string(),
+        ))
     }
-
-    Ok(user_id)
 }
 
 const MAX_ACCESS_TOKENS_TO_STORE: usize = 8;
 
-pub async fn create_access_token(db: &dyn db::Db, user_id: UserId) -> tide::Result<String> {
-    let access_token = zed_auth::random_token();
+pub async fn create_access_token(db: &dyn db::Db, user_id: UserId) -> Result<String> {
+    let access_token = rpc::auth::random_token();
     let access_token_hash =
         hash_access_token(&access_token).context("failed to hash access token")?;
     db.create_access_token_hash(user_id, &access_token_hash, MAX_ACCESS_TOKENS_TO_STORE)
@@ -69,7 +72,7 @@ pub async fn create_access_token(db: &dyn db::Db, user_id: UserId) -> tide::Resu
     Ok(access_token)
 }
 
-fn hash_access_token(token: &str) -> tide::Result<String> {
+fn hash_access_token(token: &str) -> Result<String> {
     // Avoid slow hashing in debug mode.
     let params = if cfg!(debug_assertions) {
         scrypt::Params::new(1, 1, 1).unwrap()
@@ -87,16 +90,16 @@ fn hash_access_token(token: &str) -> tide::Result<String> {
         .to_string())
 }
 
-pub fn encrypt_access_token(access_token: &str, public_key: String) -> tide::Result<String> {
+pub fn encrypt_access_token(access_token: &str, public_key: String) -> Result<String> {
     let native_app_public_key =
-        zed_auth::PublicKey::try_from(public_key).context("failed to parse app public key")?;
+        rpc::auth::PublicKey::try_from(public_key).context("failed to parse app public key")?;
     let encrypted_access_token = native_app_public_key
         .encrypt_string(&access_token)
         .context("failed to encrypt access token with public key")?;
     Ok(encrypted_access_token)
 }
 
-pub fn verify_access_token(token: &str, hash: &str) -> tide::Result<bool> {
+pub fn verify_access_token(token: &str, hash: &str) -> Result<bool> {
     let hash = PasswordHash::new(hash)?;
     Ok(Scrypt.verify_password(token.as_bytes(), &hash).is_ok())
 }

crates/collab/src/db.rs 🔗

@@ -1,26 +1,11 @@
 use anyhow::Context;
 use anyhow::Result;
-use async_std::task::{block_on, yield_now};
 use async_trait::async_trait;
 use serde::Serialize;
 pub use sqlx::postgres::PgPoolOptions as DbOptions;
 use sqlx::{types::Uuid, FromRow};
 use time::OffsetDateTime;
 
-macro_rules! test_support {
-    ($self:ident, { $($token:tt)* }) => {{
-        let body = async {
-            $($token)*
-        };
-        if $self.test_mode {
-            yield_now().await;
-            block_on(body)
-        } else {
-            body.await
-        }
-    }};
-}
-
 #[async_trait]
 pub trait Db: Send + Sync {
     async fn create_user(&self, github_login: &str, admin: bool) -> Result<UserId>;
@@ -77,20 +62,16 @@ pub trait Db: Send + Sync {
 
 pub struct PostgresDb {
     pool: sqlx::PgPool,
-    test_mode: bool,
 }
 
 impl PostgresDb {
-    pub async fn new(url: &str, max_connections: u32) -> tide::Result<Self> {
+    pub async fn new(url: &str, max_connections: u32) -> Result<Self> {
         let pool = DbOptions::new()
             .max_connections(max_connections)
             .connect(url)
             .await
             .context("failed to connect to postgres database")?;
-        Ok(Self {
-            pool,
-            test_mode: false,
-        })
+        Ok(Self { pool })
     }
 }
 
@@ -99,27 +80,23 @@ impl Db for PostgresDb {
     // users
 
     async fn create_user(&self, github_login: &str, admin: bool) -> Result<UserId> {
-        test_support!(self, {
-            let query = "
+        let query = "
                 INSERT INTO users (github_login, admin)
                 VALUES ($1, $2)
                 ON CONFLICT (github_login) DO UPDATE SET github_login = excluded.github_login
                 RETURNING id
             ";
-            Ok(sqlx::query_scalar(query)
-                .bind(github_login)
-                .bind(admin)
-                .fetch_one(&self.pool)
-                .await
-                .map(UserId)?)
-        })
+        Ok(sqlx::query_scalar(query)
+            .bind(github_login)
+            .bind(admin)
+            .fetch_one(&self.pool)
+            .await
+            .map(UserId)?)
     }
 
     async fn get_all_users(&self) -> Result<Vec<User>> {
-        test_support!(self, {
-            let query = "SELECT * FROM users ORDER BY github_login ASC";
-            Ok(sqlx::query_as(query).fetch_all(&self.pool).await?)
-        })
+        let query = "SELECT * FROM users ORDER BY github_login ASC";
+        Ok(sqlx::query_as(query).fetch_all(&self.pool).await?)
     }
 
     async fn get_user_by_id(&self, id: UserId) -> Result<Option<User>> {
@@ -129,57 +106,49 @@ impl Db for PostgresDb {
 
     async fn get_users_by_ids(&self, ids: Vec<UserId>) -> Result<Vec<User>> {
         let ids = ids.into_iter().map(|id| id.0).collect::<Vec<_>>();
-        test_support!(self, {
-            let query = "
+        let query = "
                 SELECT users.*
                 FROM users
                 WHERE users.id = ANY ($1)
             ";
 
-            Ok(sqlx::query_as(query)
-                .bind(&ids)
-                .fetch_all(&self.pool)
-                .await?)
-        })
+        Ok(sqlx::query_as(query)
+            .bind(&ids)
+            .fetch_all(&self.pool)
+            .await?)
     }
 
     async fn get_user_by_github_login(&self, github_login: &str) -> Result<Option<User>> {
-        test_support!(self, {
-            let query = "SELECT * FROM users WHERE github_login = $1 LIMIT 1";
-            Ok(sqlx::query_as(query)
-                .bind(github_login)
-                .fetch_optional(&self.pool)
-                .await?)
-        })
+        let query = "SELECT * FROM users WHERE github_login = $1 LIMIT 1";
+        Ok(sqlx::query_as(query)
+            .bind(github_login)
+            .fetch_optional(&self.pool)
+            .await?)
     }
 
     async fn set_user_is_admin(&self, id: UserId, is_admin: bool) -> Result<()> {
-        test_support!(self, {
-            let query = "UPDATE users SET admin = $1 WHERE id = $2";
-            Ok(sqlx::query(query)
-                .bind(is_admin)
-                .bind(id.0)
-                .execute(&self.pool)
-                .await
-                .map(drop)?)
-        })
+        let query = "UPDATE users SET admin = $1 WHERE id = $2";
+        Ok(sqlx::query(query)
+            .bind(is_admin)
+            .bind(id.0)
+            .execute(&self.pool)
+            .await
+            .map(drop)?)
     }
 
     async fn destroy_user(&self, id: UserId) -> Result<()> {
-        test_support!(self, {
-            let query = "DELETE FROM access_tokens WHERE user_id = $1;";
-            sqlx::query(query)
-                .bind(id.0)
-                .execute(&self.pool)
-                .await
-                .map(drop)?;
-            let query = "DELETE FROM users WHERE id = $1;";
-            Ok(sqlx::query(query)
-                .bind(id.0)
-                .execute(&self.pool)
-                .await
-                .map(drop)?)
-        })
+        let query = "DELETE FROM access_tokens WHERE user_id = $1;";
+        sqlx::query(query)
+            .bind(id.0)
+            .execute(&self.pool)
+            .await
+            .map(drop)?;
+        let query = "DELETE FROM users WHERE id = $1;";
+        Ok(sqlx::query(query)
+            .bind(id.0)
+            .execute(&self.pool)
+            .await
+            .map(drop)?)
     }
 
     // access tokens
@@ -190,12 +159,11 @@ impl Db for PostgresDb {
         access_token_hash: &str,
         max_access_token_count: usize,
     ) -> Result<()> {
-        test_support!(self, {
-            let insert_query = "
+        let insert_query = "
                 INSERT INTO access_tokens (user_id, hash)
                 VALUES ($1, $2);
             ";
-            let cleanup_query = "
+        let cleanup_query = "
                 DELETE FROM access_tokens
                 WHERE id IN (
                     SELECT id from access_tokens
@@ -205,35 +173,32 @@ impl Db for PostgresDb {
                 )
             ";
 
-            let mut tx = self.pool.begin().await?;
-            sqlx::query(insert_query)
-                .bind(user_id.0)
-                .bind(access_token_hash)
-                .execute(&mut tx)
-                .await?;
-            sqlx::query(cleanup_query)
-                .bind(user_id.0)
-                .bind(access_token_hash)
-                .bind(max_access_token_count as u32)
-                .execute(&mut tx)
-                .await?;
-            Ok(tx.commit().await?)
-        })
+        let mut tx = self.pool.begin().await?;
+        sqlx::query(insert_query)
+            .bind(user_id.0)
+            .bind(access_token_hash)
+            .execute(&mut tx)
+            .await?;
+        sqlx::query(cleanup_query)
+            .bind(user_id.0)
+            .bind(access_token_hash)
+            .bind(max_access_token_count as u32)
+            .execute(&mut tx)
+            .await?;
+        Ok(tx.commit().await?)
     }
 
     async fn get_access_token_hashes(&self, user_id: UserId) -> Result<Vec<String>> {
-        test_support!(self, {
-            let query = "
+        let query = "
                 SELECT hash
                 FROM access_tokens
                 WHERE user_id = $1
                 ORDER BY id DESC
             ";
-            Ok(sqlx::query_scalar(query)
-                .bind(user_id.0)
-                .fetch_all(&self.pool)
-                .await?)
-        })
+        Ok(sqlx::query_scalar(query)
+            .bind(user_id.0)
+            .fetch_all(&self.pool)
+            .await?)
     }
 
     // orgs
@@ -241,94 +206,83 @@ impl Db for PostgresDb {
     #[allow(unused)] // Help rust-analyzer
     #[cfg(any(test, feature = "seed-support"))]
     async fn find_org_by_slug(&self, slug: &str) -> Result<Option<Org>> {
-        test_support!(self, {
-            let query = "
+        let query = "
                 SELECT *
                 FROM orgs
                 WHERE slug = $1
             ";
-            Ok(sqlx::query_as(query)
-                .bind(slug)
-                .fetch_optional(&self.pool)
-                .await?)
-        })
+        Ok(sqlx::query_as(query)
+            .bind(slug)
+            .fetch_optional(&self.pool)
+            .await?)
     }
 
     #[cfg(any(test, feature = "seed-support"))]
     async fn create_org(&self, name: &str, slug: &str) -> Result<OrgId> {
-        test_support!(self, {
-            let query = "
+        let query = "
                 INSERT INTO orgs (name, slug)
                 VALUES ($1, $2)
                 RETURNING id
             ";
-            Ok(sqlx::query_scalar(query)
-                .bind(name)
-                .bind(slug)
-                .fetch_one(&self.pool)
-                .await
-                .map(OrgId)?)
-        })
+        Ok(sqlx::query_scalar(query)
+            .bind(name)
+            .bind(slug)
+            .fetch_one(&self.pool)
+            .await
+            .map(OrgId)?)
     }
 
     #[cfg(any(test, feature = "seed-support"))]
     async fn add_org_member(&self, org_id: OrgId, user_id: UserId, is_admin: bool) -> Result<()> {
-        test_support!(self, {
-            let query = "
+        let query = "
                 INSERT INTO org_memberships (org_id, user_id, admin)
                 VALUES ($1, $2, $3)
                 ON CONFLICT DO NOTHING
             ";
-            Ok(sqlx::query(query)
-                .bind(org_id.0)
-                .bind(user_id.0)
-                .bind(is_admin)
-                .execute(&self.pool)
-                .await
-                .map(drop)?)
-        })
+        Ok(sqlx::query(query)
+            .bind(org_id.0)
+            .bind(user_id.0)
+            .bind(is_admin)
+            .execute(&self.pool)
+            .await
+            .map(drop)?)
     }
 
     // channels
 
     #[cfg(any(test, feature = "seed-support"))]
     async fn create_org_channel(&self, org_id: OrgId, name: &str) -> Result<ChannelId> {
-        test_support!(self, {
-            let query = "
+        let query = "
                 INSERT INTO channels (owner_id, owner_is_user, name)
                 VALUES ($1, false, $2)
                 RETURNING id
             ";
-            Ok(sqlx::query_scalar(query)
-                .bind(org_id.0)
-                .bind(name)
-                .fetch_one(&self.pool)
-                .await
-                .map(ChannelId)?)
-        })
+        Ok(sqlx::query_scalar(query)
+            .bind(org_id.0)
+            .bind(name)
+            .fetch_one(&self.pool)
+            .await
+            .map(ChannelId)?)
     }
 
     #[allow(unused)] // Help rust-analyzer
     #[cfg(any(test, feature = "seed-support"))]
     async fn get_org_channels(&self, org_id: OrgId) -> Result<Vec<Channel>> {
-        test_support!(self, {
-            let query = "
+        let query = "
                 SELECT *
                 FROM channels
                 WHERE
                     channels.owner_is_user = false AND
                     channels.owner_id = $1
             ";
-            Ok(sqlx::query_as(query)
-                .bind(org_id.0)
-                .fetch_all(&self.pool)
-                .await?)
-        })
+        Ok(sqlx::query_as(query)
+            .bind(org_id.0)
+            .fetch_all(&self.pool)
+            .await?)
     }
 
     async fn get_accessible_channels(&self, user_id: UserId) -> Result<Vec<Channel>> {
-        test_support!(self, {
-            let query = "
+        let query = "
                 SELECT
                     channels.*
                 FROM
@@ -337,11 +291,10 @@ impl Db for PostgresDb {
                     channel_memberships.user_id = $1 AND
                     channel_memberships.channel_id = channels.id
             ";
-            Ok(sqlx::query_as(query)
-                .bind(user_id.0)
-                .fetch_all(&self.pool)
-                .await?)
-        })
+        Ok(sqlx::query_as(query)
+            .bind(user_id.0)
+            .fetch_all(&self.pool)
+            .await?)
     }
 
     async fn can_user_access_channel(
@@ -349,20 +302,18 @@ impl Db for PostgresDb {
         user_id: UserId,
         channel_id: ChannelId,
     ) -> Result<bool> {
-        test_support!(self, {
-            let query = "
+        let query = "
                 SELECT id
                 FROM channel_memberships
                 WHERE user_id = $1 AND channel_id = $2
                 LIMIT 1
             ";
-            Ok(sqlx::query_scalar::<_, i32>(query)
-                .bind(user_id.0)
-                .bind(channel_id.0)
-                .fetch_optional(&self.pool)
-                .await
-                .map(|e| e.is_some())?)
-        })
+        Ok(sqlx::query_scalar::<_, i32>(query)
+            .bind(user_id.0)
+            .bind(channel_id.0)
+            .fetch_optional(&self.pool)
+            .await
+            .map(|e| e.is_some())?)
     }
 
     #[cfg(any(test, feature = "seed-support"))]
@@ -372,20 +323,18 @@ impl Db for PostgresDb {
         user_id: UserId,
         is_admin: bool,
     ) -> Result<()> {
-        test_support!(self, {
-            let query = "
+        let query = "
                 INSERT INTO channel_memberships (channel_id, user_id, admin)
                 VALUES ($1, $2, $3)
                 ON CONFLICT DO NOTHING
             ";
-            Ok(sqlx::query(query)
-                .bind(channel_id.0)
-                .bind(user_id.0)
-                .bind(is_admin)
-                .execute(&self.pool)
-                .await
-                .map(drop)?)
-        })
+        Ok(sqlx::query(query)
+            .bind(channel_id.0)
+            .bind(user_id.0)
+            .bind(is_admin)
+            .execute(&self.pool)
+            .await
+            .map(drop)?)
     }
 
     // messages
@@ -398,23 +347,21 @@ impl Db for PostgresDb {
         timestamp: OffsetDateTime,
         nonce: u128,
     ) -> Result<MessageId> {
-        test_support!(self, {
-            let query = "
+        let query = "
                 INSERT INTO channel_messages (channel_id, sender_id, body, sent_at, nonce)
                 VALUES ($1, $2, $3, $4, $5)
                 ON CONFLICT (nonce) DO UPDATE SET nonce = excluded.nonce
                 RETURNING id
             ";
-            Ok(sqlx::query_scalar(query)
-                .bind(channel_id.0)
-                .bind(sender_id.0)
-                .bind(body)
-                .bind(timestamp)
-                .bind(Uuid::from_u128(nonce))
-                .fetch_one(&self.pool)
-                .await
-                .map(MessageId)?)
-        })
+        Ok(sqlx::query_scalar(query)
+            .bind(channel_id.0)
+            .bind(sender_id.0)
+            .bind(body)
+            .bind(timestamp)
+            .bind(Uuid::from_u128(nonce))
+            .fetch_one(&self.pool)
+            .await
+            .map(MessageId)?)
     }
 
     async fn get_channel_messages(
@@ -423,8 +370,7 @@ impl Db for PostgresDb {
         count: usize,
         before_id: Option<MessageId>,
     ) -> Result<Vec<ChannelMessage>> {
-        test_support!(self, {
-            let query = r#"
+        let query = r#"
                 SELECT * FROM (
                     SELECT
                         id, channel_id, sender_id, body, sent_at AT TIME ZONE 'UTC' as sent_at, nonce
@@ -438,35 +384,32 @@ impl Db for PostgresDb {
                 ) as recent_messages
                 ORDER BY id ASC
             "#;
-            Ok(sqlx::query_as(query)
-                .bind(channel_id.0)
-                .bind(before_id.unwrap_or(MessageId::MAX))
-                .bind(count as i64)
-                .fetch_all(&self.pool)
-                .await?)
-        })
+        Ok(sqlx::query_as(query)
+            .bind(channel_id.0)
+            .bind(before_id.unwrap_or(MessageId::MAX))
+            .bind(count as i64)
+            .fetch_all(&self.pool)
+            .await?)
     }
 
     #[cfg(test)]
     async fn teardown(&self, name: &str, url: &str) {
         use util::ResultExt;
 
-        test_support!(self, {
-            let query = "
+        let query = "
                 SELECT pg_terminate_backend(pg_stat_activity.pid)
                 FROM pg_stat_activity
                 WHERE pg_stat_activity.datname = '{}' AND pid <> pg_backend_pid();
             ";
-            sqlx::query(query)
-                .bind(name)
-                .execute(&self.pool)
-                .await
-                .log_err();
-            self.pool.close().await;
-            <sqlx::Postgres as sqlx::migrate::MigrateDatabase>::drop_database(url)
-                .await
-                .log_err();
-        })
+        sqlx::query(query)
+            .bind(name)
+            .execute(&self.pool)
+            .await
+            .log_err();
+        self.pool.close().await;
+        <sqlx::Postgres as sqlx::migrate::MigrateDatabase>::drop_database(url)
+            .await
+            .log_err();
     }
 }
 
@@ -704,12 +647,11 @@ pub mod tests {
             let name = format!("zed-test-{}", rng.gen::<u128>());
             let url = format!("postgres://postgres@localhost/{}", name);
             let migrations_path = Path::new(concat!(env!("CARGO_MANIFEST_DIR"), "/migrations"));
-            let db = block_on(async {
+            let db = futures::executor::block_on(async {
                 Postgres::create_database(&url)
                     .await
                     .expect("failed to create test db");
-                let mut db = PostgresDb::new(&url, 5).await.unwrap();
-                db.test_mode = true;
+                let db = PostgresDb::new(&url, 5).await.unwrap();
                 let migrator = Migrator::new(migrations_path).await.unwrap();
                 migrator.run(&db.pool).await.unwrap();
                 db
@@ -737,7 +679,7 @@ pub mod tests {
     impl Drop for TestDb {
         fn drop(&mut self) {
             if let Some(db) = self.db.take() {
-                block_on(db.teardown(&self.name, &self.url));
+                futures::executor::block_on(db.teardown(&self.name, &self.url));
             }
         }
     }

crates/collab/src/main.rs 🔗

@@ -2,18 +2,16 @@ mod api;
 mod auth;
 mod db;
 mod env;
-mod errors;
 mod rpc;
 
-use ::rpc::Peer;
-use async_std::net::TcpListener;
-use async_trait::async_trait;
+use axum::{body::Body, http::StatusCode, response::IntoResponse, Router};
 use db::{Db, PostgresDb};
-use serde::Deserialize;
-use std::sync::Arc;
-use tide_compress::CompressMiddleware;
 
-type Request = tide::Request<Arc<AppState>>;
+use serde::Deserialize;
+use std::{
+    net::{SocketAddr, TcpListener},
+    sync::Arc,
+};
 
 #[derive(Default, Deserialize)]
 pub struct Config {
@@ -24,39 +22,26 @@ pub struct Config {
 
 pub struct AppState {
     db: Arc<dyn Db>,
-    config: Config,
+    api_token: String,
 }
 
 impl AppState {
-    async fn new(config: Config) -> tide::Result<Arc<Self>> {
+    async fn new(config: &Config) -> Result<Arc<Self>> {
         let db = PostgresDb::new(&config.database_url, 5).await?;
-
         let this = Self {
             db: Arc::new(db),
-            config,
+            api_token: config.api_token.clone(),
         };
         Ok(Arc::new(this))
     }
 }
 
-#[async_trait]
-trait RequestExt {
-    fn db(&self) -> &Arc<dyn Db>;
-}
-
-#[async_trait]
-impl RequestExt for Request {
-    fn db(&self) -> &Arc<dyn Db> {
-        &self.state().db
-    }
-}
-
-#[async_std::main]
-async fn main() -> tide::Result<()> {
+#[tokio::main]
+async fn main() -> Result<()> {
     if std::env::var("LOG_JSON").is_ok() {
         json_env_logger::init();
     } else {
-        tide::log::start();
+        env_logger::init();
     }
 
     if let Err(error) = env::load_dotenv() {
@@ -67,32 +52,63 @@ async fn main() -> tide::Result<()> {
     }
 
     let config = envy::from_env::<Config>().expect("error loading config");
-    let state = AppState::new(config).await?;
-    let rpc = Peer::new();
-    run_server(
-        state.clone(),
-        rpc,
-        TcpListener::bind(&format!("0.0.0.0:{}", state.config.http_port)).await?,
-    )
-    .await?;
+    let state = AppState::new(&config).await?;
+
+    let listener = TcpListener::bind(&format!("0.0.0.0:{}", config.http_port))
+        .expect("failed to bind TCP listener");
+
+    let app = Router::<Body>::new()
+        .merge(api::routes(state.clone()))
+        .merge(rpc::routes(state));
+
+    axum::Server::from_tcp(listener)?
+        .serve(app.into_make_service_with_connect_info::<SocketAddr>())
+        .await?;
+
     Ok(())
 }
 
-pub async fn run_server(
-    state: Arc<AppState>,
-    rpc: Arc<Peer>,
-    listener: TcpListener,
-) -> tide::Result<()> {
-    let mut app = tide::with_state(state.clone());
-    rpc::add_routes(&mut app, &rpc);
+pub type Result<T, E = Error> = std::result::Result<T, E>;
 
-    let mut web = tide::with_state(state.clone());
-    web.with(CompressMiddleware::new());
-    api::add_routes(&mut web);
+pub enum Error {
+    Http(StatusCode, String),
+    Internal(anyhow::Error),
+}
 
-    app.at("/").nest(web);
+impl<E> From<E> for Error
+where
+    E: Into<anyhow::Error>,
+{
+    fn from(error: E) -> Self {
+        Self::Internal(error.into())
+    }
+}
 
-    app.listen(listener).await?;
+impl IntoResponse for Error {
+    fn into_response(self) -> axum::response::Response {
+        match self {
+            Error::Http(code, message) => (code, message).into_response(),
+            Error::Internal(error) => {
+                (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", &error)).into_response()
+            }
+        }
+    }
+}
 
-    Ok(())
+impl std::fmt::Debug for Error {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Error::Http(code, message) => (code, message).fmt(f),
+            Error::Internal(error) => error.fmt(f),
+        }
+    }
+}
+
+impl std::fmt::Display for Error {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Error::Http(code, message) => write!(f, "{code}: {message}"),
+            Error::Internal(error) => error.fmt(f),
+        }
+    }
 }

crates/collab/src/rpc.rs 🔗

@@ -1,47 +1,56 @@
 mod store;
 
-use super::{
-    auth::process_auth_header,
+use crate::{
+    auth,
     db::{ChannelId, MessageId, UserId},
-    AppState,
+    AppState, Result,
 };
 use anyhow::anyhow;
-use async_io::Timer;
-use async_std::{
-    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
-    task,
+use async_tungstenite::tungstenite::{
+    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
+};
+use axum::{
+    body::Body,
+    extract::{
+        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
+        ConnectInfo, WebSocketUpgrade,
+    },
+    headers::{Header, HeaderName},
+    http::StatusCode,
+    middleware,
+    response::{IntoResponse, Response},
+    routing::get,
+    Extension, Router, TypedHeader,
 };
-use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
 use collections::{HashMap, HashSet};
-use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
+use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
+use lazy_static::lazy_static;
 use log::{as_debug, as_display};
 use rpc::{
     proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
     Connection, ConnectionId, Peer, TypedEnvelope,
 };
-use sha1::{Digest as _, Sha1};
 use std::{
     any::TypeId,
     future::Future,
     marker::PhantomData,
+    net::SocketAddr,
     ops::{Deref, DerefMut},
     rc::Rc,
     sync::Arc,
     time::{Duration, Instant},
 };
 use store::{Store, Worktree};
-use surf::StatusCode;
-use tide::{
-    http::headers::{HeaderName, CONNECTION, UPGRADE},
-    Request, Response,
-};
 use time::OffsetDateTime;
+use tokio::{
+    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
+    time::Sleep,
+};
+use tower::ServiceBuilder;
 use util::ResultExt;
 
 type MessageHandler = Box<
-    dyn Send
-        + Sync
-        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
+    dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, Result<()>>,
 >;
 
 pub struct Server {
@@ -53,9 +62,9 @@ pub struct Server {
 }
 
 pub trait Executor: Send + Clone {
-    type Timer: Send + Future;
+    type Sleep: Send + Future;
     fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
-    fn timer(&self, duration: Duration) -> Self::Timer;
+    fn sleep(&self, duration: Duration) -> Self::Sleep;
 }
 
 #[derive(Clone)]
@@ -77,11 +86,10 @@ struct StoreWriteGuard<'a> {
 impl Server {
     pub fn new(
         app_state: Arc<AppState>,
-        peer: Arc<Peer>,
         notifications: Option<mpsc::UnboundedSender<()>>,
     ) -> Arc<Self> {
         let mut server = Self {
-            peer,
+            peer: Peer::new(),
             app_state,
             store: Default::default(),
             handlers: Default::default(),
@@ -141,7 +149,7 @@ impl Server {
     fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
     where
         F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
-        Fut: 'static + Send + Future<Output = tide::Result<()>>,
+        Fut: 'static + Send + Future<Output = Result<()>>,
         M: EnvelopedMessage,
     {
         let prev_handler = self.handlers.insert(
@@ -160,7 +168,7 @@ impl Server {
     fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
     where
         F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
-        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
+        Fut: 'static + Send + Future<Output = Result<M::Response>>,
         M: RequestMessage,
     {
         self.add_message_handler(move |server, envelope| {
@@ -193,7 +201,7 @@ impl Server {
         F: 'static
             + Send
             + Sync
-            + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> tide::Result<M::Response>,
+            + Fn(Arc<Self>, &mut Store, TypedEnvelope<M>) -> Result<M::Response>,
         M: RequestMessage,
     {
         let handler = Arc::new(handler);
@@ -237,7 +245,7 @@ impl Server {
                 .add_connection(connection, {
                     let executor = executor.clone();
                     move |duration| {
-                        let timer = executor.timer(duration);
+                        let timer = executor.sleep(duration);
                         async move {
                             timer.await;
                         }
@@ -308,7 +316,7 @@ impl Server {
         }
     }
 
-    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
+    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
         self.peer.disconnect(connection_id);
         let mut state = self.state_mut().await;
         let removed_connection = state.remove_connection(connection_id)?;
@@ -342,14 +350,14 @@ impl Server {
         Ok(())
     }
 
-    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
+    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> Result<proto::Ack> {
         Ok(proto::Ack {})
     }
 
     async fn register_project(
         self: Arc<Server>,
         request: TypedEnvelope<proto::RegisterProject>,
-    ) -> tide::Result<proto::RegisterProjectResponse> {
+    ) -> Result<proto::RegisterProjectResponse> {
         let project_id = {
             let mut state = self.state_mut().await;
             let user_id = state.user_id_for_connection(request.sender_id)?;
@@ -361,7 +369,7 @@ impl Server {
     async fn unregister_project(
         self: Arc<Server>,
         request: TypedEnvelope<proto::UnregisterProject>,
-    ) -> tide::Result<()> {
+    ) -> Result<()> {
         let mut state = self.state_mut().await;
         let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
         self.update_contacts_for_users(&*state, &project.authorized_user_ids());
@@ -371,7 +379,7 @@ impl Server {
     async fn share_project(
         self: Arc<Server>,
         request: TypedEnvelope<proto::ShareProject>,
-    ) -> tide::Result<proto::Ack> {
+    ) -> Result<proto::Ack> {
         let mut state = self.state_mut().await;
         let project = state.share_project(request.payload.project_id, request.sender_id)?;
         self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
@@ -381,7 +389,7 @@ impl Server {
     async fn unshare_project(
         self: Arc<Server>,
         request: TypedEnvelope<proto::UnshareProject>,
-    ) -> tide::Result<()> {
+    ) -> Result<()> {
         let project_id = request.payload.project_id;
         let mut state = self.state_mut().await;
         let project = state.unshare_project(project_id, request.sender_id)?;
@@ -397,7 +405,7 @@ impl Server {
         self: Arc<Server>,
         state: &mut Store,
         request: TypedEnvelope<proto::JoinProject>,
-    ) -> tide::Result<proto::JoinProjectResponse> {
+    ) -> Result<proto::JoinProjectResponse> {
         let project_id = request.payload.project_id;
 
         let user_id = state.user_id_for_connection(request.sender_id)?;
@@ -470,7 +478,7 @@ impl Server {
     async fn leave_project(
         self: Arc<Server>,
         request: TypedEnvelope<proto::LeaveProject>,
-    ) -> tide::Result<()> {
+    ) -> Result<()> {
         let sender_id = request.sender_id;
         let project_id = request.payload.project_id;
         let mut state = self.state_mut().await;
@@ -491,7 +499,7 @@ impl Server {
     async fn register_worktree(
         self: Arc<Server>,
         request: TypedEnvelope<proto::RegisterWorktree>,
-    ) -> tide::Result<proto::Ack> {
+    ) -> Result<proto::Ack> {
         let mut contact_user_ids = HashSet::default();
         for github_login in &request.payload.authorized_logins {
             let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
@@ -528,7 +536,7 @@ impl Server {
     async fn unregister_worktree(
         self: Arc<Server>,
         request: TypedEnvelope<proto::UnregisterWorktree>,
-    ) -> tide::Result<()> {
+    ) -> Result<()> {
         let project_id = request.payload.project_id;
         let worktree_id = request.payload.worktree_id;
         let mut state = self.state_mut().await;
@@ -550,7 +558,7 @@ impl Server {
     async fn update_worktree(
         self: Arc<Server>,
         request: TypedEnvelope<proto::UpdateWorktree>,
-    ) -> tide::Result<proto::Ack> {
+    ) -> Result<proto::Ack> {
         let connection_ids = self.state_mut().await.update_worktree(
             request.sender_id,
             request.payload.project_id,
@@ -570,7 +578,7 @@ impl Server {
     async fn update_diagnostic_summary(
         self: Arc<Server>,
         request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
-    ) -> tide::Result<()> {
+    ) -> Result<()> {
         let summary = request
             .payload
             .summary
@@ -593,7 +601,7 @@ impl Server {
     async fn start_language_server(
         self: Arc<Server>,
         request: TypedEnvelope<proto::StartLanguageServer>,
-    ) -> tide::Result<()> {
+    ) -> Result<()> {
         let receiver_ids = self.state_mut().await.start_language_server(
             request.payload.project_id,
             request.sender_id,
@@ -613,7 +621,7 @@ impl Server {
     async fn update_language_server(
         self: Arc<Server>,
         request: TypedEnvelope<proto::UpdateLanguageServer>,
-    ) -> tide::Result<()> {
+    ) -> Result<()> {
         let receiver_ids = self
             .state()
             .await
@@ -628,7 +636,7 @@ impl Server {
     async fn forward_project_request<T>(
         self: Arc<Server>,
         request: TypedEnvelope<T>,
-    ) -> tide::Result<T::Response>
+    ) -> Result<T::Response>
     where
         T: EntityMessage + RequestMessage,
     {
@@ -646,7 +654,7 @@ impl Server {
     async fn save_buffer(
         self: Arc<Server>,
         request: TypedEnvelope<proto::SaveBuffer>,
-    ) -> tide::Result<proto::BufferSaved> {
+    ) -> Result<proto::BufferSaved> {
         let host = self
             .state()
             .await
@@ -673,7 +681,7 @@ impl Server {
     async fn update_buffer(
         self: Arc<Server>,
         request: TypedEnvelope<proto::UpdateBuffer>,
-    ) -> tide::Result<proto::Ack> {
+    ) -> Result<proto::Ack> {
         let receiver_ids = self
             .state()
             .await
@@ -688,7 +696,7 @@ impl Server {
     async fn update_buffer_file(
         self: Arc<Server>,
         request: TypedEnvelope<proto::UpdateBufferFile>,
-    ) -> tide::Result<()> {
+    ) -> Result<()> {
         let receiver_ids = self
             .state()
             .await
@@ -703,7 +711,7 @@ impl Server {
     async fn buffer_reloaded(
         self: Arc<Server>,
         request: TypedEnvelope<proto::BufferReloaded>,
-    ) -> tide::Result<()> {
+    ) -> Result<()> {
         let receiver_ids = self
             .state()
             .await
@@ -718,7 +726,7 @@ impl Server {
     async fn buffer_saved(
         self: Arc<Server>,
         request: TypedEnvelope<proto::BufferSaved>,
-    ) -> tide::Result<()> {
+    ) -> Result<()> {
         let receiver_ids = self
             .state()
             .await
@@ -733,7 +741,7 @@ impl Server {
     async fn follow(
         self: Arc<Self>,
         request: TypedEnvelope<proto::Follow>,
-    ) -> tide::Result<proto::FollowResponse> {
+    ) -> Result<proto::FollowResponse> {
         let leader_id = ConnectionId(request.payload.leader_id);
         let follower_id = request.sender_id;
         if !self
@@ -754,10 +762,7 @@ impl Server {
         Ok(response)
     }
 
-    async fn unfollow(
-        self: Arc<Self>,
-        request: TypedEnvelope<proto::Unfollow>,
-    ) -> tide::Result<()> {
+    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
         let leader_id = ConnectionId(request.payload.leader_id);
         if !self
             .state()
@@ -775,7 +780,7 @@ impl Server {
     async fn update_followers(
         self: Arc<Self>,
         request: TypedEnvelope<proto::UpdateFollowers>,
-    ) -> tide::Result<()> {
+    ) -> Result<()> {
         let connection_ids = self
             .state()
             .await
@@ -802,7 +807,7 @@ impl Server {
     async fn get_channels(
         self: Arc<Server>,
         request: TypedEnvelope<proto::GetChannels>,
-    ) -> tide::Result<proto::GetChannelsResponse> {
+    ) -> Result<proto::GetChannelsResponse> {
         let user_id = self
             .state()
             .await
@@ -822,7 +827,7 @@ impl Server {
     async fn get_users(
         self: Arc<Server>,
         request: TypedEnvelope<proto::GetUsers>,
-    ) -> tide::Result<proto::GetUsersResponse> {
+    ) -> Result<proto::GetUsersResponse> {
         let user_ids = request
             .payload
             .user_ids
@@ -867,7 +872,7 @@ impl Server {
     async fn join_channel(
         self: Arc<Self>,
         request: TypedEnvelope<proto::JoinChannel>,
-    ) -> tide::Result<proto::JoinChannelResponse> {
+    ) -> Result<proto::JoinChannelResponse> {
         let user_id = self
             .state()
             .await
@@ -908,7 +913,7 @@ impl Server {
     async fn leave_channel(
         self: Arc<Self>,
         request: TypedEnvelope<proto::LeaveChannel>,
-    ) -> tide::Result<()> {
+    ) -> Result<()> {
         let user_id = self
             .state()
             .await
@@ -933,7 +938,7 @@ impl Server {
     async fn send_channel_message(
         self: Arc<Self>,
         request: TypedEnvelope<proto::SendChannelMessage>,
-    ) -> tide::Result<proto::SendChannelMessageResponse> {
+    ) -> Result<proto::SendChannelMessageResponse> {
         let channel_id = ChannelId::from_proto(request.payload.channel_id);
         let user_id;
         let connection_ids;
@@ -988,7 +993,7 @@ impl Server {
     async fn get_channel_messages(
         self: Arc<Self>,
         request: TypedEnvelope<proto::GetChannelMessages>,
-    ) -> tide::Result<proto::GetChannelMessagesResponse> {
+    ) -> Result<proto::GetChannelMessagesResponse> {
         let user_id = self
             .state()
             .await
@@ -1030,10 +1035,10 @@ impl Server {
 
     async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
         #[cfg(test)]
-        async_std::task::yield_now().await;
+        tokio::task::yield_now().await;
         let guard = self.store.read().await;
         #[cfg(test)]
-        async_std::task::yield_now().await;
+        tokio::task::yield_now().await;
         StoreReadGuard {
             guard,
             _not_send: PhantomData,
@@ -1042,10 +1047,10 @@ impl Server {
 
     async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
         #[cfg(test)]
-        async_std::task::yield_now().await;
+        tokio::task::yield_now().await;
         let guard = self.store.write().await;
         #[cfg(test)]
-        async_std::task::yield_now().await;
+        tokio::task::yield_now().await;
         StoreWriteGuard {
             guard,
             _not_send: PhantomData,
@@ -1083,14 +1088,14 @@ impl<'a> Drop for StoreWriteGuard<'a> {
 }
 
 impl Executor for RealExecutor {
-    type Timer = Timer;
+    type Sleep = Sleep;
 
     fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
-        task::spawn(future);
+        tokio::task::spawn(future);
     }
 
-    fn timer(&self, duration: Duration) -> Self::Timer {
-        Timer::after(duration)
+    fn sleep(&self, duration: Duration) -> Self::Sleep {
+        tokio::time::sleep(duration)
     }
 }
 
@@ -1105,75 +1110,100 @@ where
     }
 }
 
-pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
-    let server = Server::new(app.state().clone(), rpc.clone(), None);
-    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
-        let server = server.clone();
-        async move {
-            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+lazy_static! {
+    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
+}
 
-            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
-            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
-            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
-            let client_protocol_version: Option<u32> = request
-                .header("X-Zed-Protocol-Version")
-                .and_then(|v| v.as_str().parse().ok());
+pub struct ProtocolVersion(u32);
 
-            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
-                return Ok(Response::new(StatusCode::UpgradeRequired));
-            }
+impl Header for ProtocolVersion {
+    fn name() -> &'static HeaderName {
+        &ZED_PROTOCOL_VERSION
+    }
 
-            let header = match request.header("Sec-Websocket-Key") {
-                Some(h) => h.as_str(),
-                None => return Err(anyhow!("expected sec-websocket-key"))?,
-            };
+    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
+    where
+        Self: Sized,
+        I: Iterator<Item = &'i axum::http::HeaderValue>,
+    {
+        let version = values
+            .next()
+            .ok_or_else(|| axum::headers::Error::invalid())?
+            .to_str()
+            .map_err(|_| axum::headers::Error::invalid())?
+            .parse()
+            .map_err(|_| axum::headers::Error::invalid())?;
+        Ok(Self(version))
+    }
 
-            let user_id = process_auth_header(&request).await?;
-
-            let mut response = Response::new(StatusCode::SwitchingProtocols);
-            response.insert_header(UPGRADE, "websocket");
-            response.insert_header(CONNECTION, "Upgrade");
-            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
-            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
-            response.insert_header("Sec-Websocket-Version", "13");
-
-            let http_res: &mut tide::http::Response = response.as_mut();
-            let upgrade_receiver = http_res.recv_upgrade().await;
-            let addr = request.remote().unwrap_or("unknown").to_string();
-            task::spawn(async move {
-                if let Some(stream) = upgrade_receiver.await {
-                    server
-                        .handle_connection(
-                            Connection::new(
-                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
-                            ),
-                            addr,
-                            user_id,
-                            None,
-                            RealExecutor,
-                        )
-                        .await;
-                }
-            });
+    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
+        values.extend([self.0.to_string().parse().unwrap()]);
+    }
+}
 
-            Ok(response)
-        }
-    });
+pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
+    let server = Server::new(app_state.clone(), None);
+    Router::new()
+        .route("/rpc", get(handle_websocket_request))
+        .layer(
+            ServiceBuilder::new()
+                .layer(Extension(app_state))
+                .layer(middleware::from_fn(auth::validate_header))
+                .layer(Extension(server)),
+        )
 }
 
-fn header_contains_ignore_case<T>(
-    request: &tide::Request<T>,
-    header_name: HeaderName,
-    value: &str,
-) -> bool {
-    request
-        .header(header_name)
-        .map(|h| {
-            h.as_str()
-                .split(',')
-                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
-        })
-        .unwrap_or(false)
+pub async fn handle_websocket_request(
+    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
+    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
+    Extension(server): Extension<Arc<Server>>,
+    Extension(user_id): Extension<UserId>,
+    ws: WebSocketUpgrade,
+) -> Response {
+    if protocol_version != rpc::PROTOCOL_VERSION {
+        return (
+            StatusCode::UPGRADE_REQUIRED,
+            "client must be upgraded".to_string(),
+        )
+            .into_response();
+    }
+    let socket_address = socket_address.to_string();
+    ws.on_upgrade(move |socket| {
+        let socket = socket
+            .map_ok(to_tungstenite_message)
+            .err_into()
+            .with(|message| async move { Ok(to_axum_message(message)) });
+        let connection = Connection::new(Box::pin(socket));
+        server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
+    })
+}
+
+fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
+    match message {
+        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
+        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
+        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
+        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
+        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
+            code: frame.code.into(),
+            reason: frame.reason,
+        })),
+    }
+}
+
+fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
+    match message {
+        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
+        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
+        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
+        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
+        AxumMessage::Close(frame) => {
+            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
+                code: frame.code.into(),
+                reason: frame.reason,
+            }))
+        }
+    }
 }
 
 #[cfg(test)]
@@ -1181,7 +1211,7 @@ mod tests {
     use super::*;
     use crate::{
         db::{tests::TestDb, UserId},
-        AppState, Config,
+        AppState,
     };
     use ::rpc::Peer;
     use client::{
@@ -5621,7 +5651,7 @@ mod tests {
             let app_state = Self::build_app_state(&test_db).await;
             let peer = Peer::new();
             let notifications = mpsc::unbounded();
-            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
+            let server = Server::new(app_state.clone(), Some(notifications.0));
             Self {
                 peer,
                 app_state,
@@ -5736,11 +5766,9 @@ mod tests {
         }
 
         async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
-            let mut config = Config::default();
-            config.database_url = test_db.url.clone();
             Arc::new(AppState {
                 db: test_db.db().clone(),
-                config,
+                api_token: Default::default(),
             })
         }
 
@@ -5752,15 +5780,15 @@ mod tests {
         where
             F: FnMut(&Store) -> bool,
         {
-            async_std::future::timeout(Duration::from_millis(500), async {
-                while !(predicate)(&*self.server.store.read().await) {
-                    self.foreground.start_waiting();
-                    self.notifications.next().await;
-                    self.foreground.finish_waiting();
-                }
-            })
-            .await
-            .expect("condition timed out");
+            assert!(
+                self.foreground.parking_forbidden(),
+                "you must call forbid_parking to use server conditions so we don't block indefinitely"
+            );
+            while !(predicate)(&*self.server.store.read().await) {
+                self.foreground.start_waiting();
+                self.notifications.next().await;
+                self.foreground.finish_waiting();
+            }
         }
     }
 
@@ -6325,13 +6353,13 @@ mod tests {
     }
 
     impl Executor for Arc<gpui::executor::Background> {
-        type Timer = gpui::executor::Timer;
+        type Sleep = gpui::executor::Timer;
 
         fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
             self.spawn(future).detach();
         }
 
-        fn timer(&self, duration: Duration) -> Self::Timer {
+        fn sleep(&self, duration: Duration) -> Self::Sleep {
             self.as_ref().timer(duration)
         }
     }

crates/collab/src/rpc/store.rs 🔗

@@ -1,5 +1,5 @@
 use crate::db::{ChannelId, UserId};
-use anyhow::anyhow;
+use anyhow::{anyhow, Result};
 use collections::{BTreeMap, HashMap, HashSet};
 use rpc::{proto, ConnectionId};
 use std::{collections::hash_map, path::PathBuf};
@@ -99,7 +99,7 @@ impl Store {
     pub fn remove_connection(
         &mut self,
         connection_id: ConnectionId,
-    ) -> tide::Result<RemovedConnectionState> {
+    ) -> Result<RemovedConnectionState> {
         let connection = if let Some(connection) = self.connections.remove(&connection_id) {
             connection
         } else {
@@ -165,7 +165,7 @@ impl Store {
         }
     }
 
-    pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> tide::Result<UserId> {
+    pub fn user_id_for_connection(&self, connection_id: ConnectionId) -> Result<UserId> {
         Ok(self
             .connections
             .get(&connection_id)
@@ -258,7 +258,7 @@ impl Store {
         worktree_id: u64,
         connection_id: ConnectionId,
         worktree: Worktree,
-    ) -> tide::Result<()> {
+    ) -> Result<()> {
         let project = self
             .projects
             .get_mut(&project_id)
@@ -286,7 +286,7 @@ impl Store {
         &mut self,
         project_id: u64,
         connection_id: ConnectionId,
-    ) -> tide::Result<Project> {
+    ) -> Result<Project> {
         match self.projects.entry(project_id) {
             hash_map::Entry::Occupied(e) => {
                 if e.get().host_connection_id == connection_id {
@@ -326,7 +326,7 @@ impl Store {
         project_id: u64,
         worktree_id: u64,
         acting_connection_id: ConnectionId,
-    ) -> tide::Result<(Worktree, Vec<ConnectionId>)> {
+    ) -> Result<(Worktree, Vec<ConnectionId>)> {
         let project = self
             .projects
             .get_mut(&project_id)
@@ -363,7 +363,7 @@ impl Store {
         &mut self,
         project_id: u64,
         connection_id: ConnectionId,
-    ) -> tide::Result<SharedProject> {
+    ) -> Result<SharedProject> {
         if let Some(project) = self.projects.get_mut(&project_id) {
             if project.host_connection_id == connection_id {
                 let mut share = ProjectShare::default();
@@ -383,7 +383,7 @@ impl Store {
         &mut self,
         project_id: u64,
         acting_connection_id: ConnectionId,
-    ) -> tide::Result<UnsharedProject> {
+    ) -> Result<UnsharedProject> {
         let project = if let Some(project) = self.projects.get_mut(&project_id) {
             project
         } else {
@@ -418,7 +418,7 @@ impl Store {
         worktree_id: u64,
         connection_id: ConnectionId,
         summary: proto::DiagnosticSummary,
-    ) -> tide::Result<Vec<ConnectionId>> {
+    ) -> Result<Vec<ConnectionId>> {
         let project = self
             .projects
             .get_mut(&project_id)
@@ -443,7 +443,7 @@ impl Store {
         project_id: u64,
         connection_id: ConnectionId,
         language_server: proto::LanguageServer,
-    ) -> tide::Result<Vec<ConnectionId>> {
+    ) -> Result<Vec<ConnectionId>> {
         let project = self
             .projects
             .get_mut(&project_id)
@@ -461,7 +461,7 @@ impl Store {
         connection_id: ConnectionId,
         user_id: UserId,
         project_id: u64,
-    ) -> tide::Result<JoinedProject> {
+    ) -> Result<JoinedProject> {
         let connection = self
             .connections
             .get_mut(&connection_id)
@@ -498,7 +498,7 @@ impl Store {
         &mut self,
         connection_id: ConnectionId,
         project_id: u64,
-    ) -> tide::Result<LeftProject> {
+    ) -> Result<LeftProject> {
         let project = self
             .projects
             .get_mut(&project_id)
@@ -533,7 +533,7 @@ impl Store {
         worktree_id: u64,
         removed_entries: &[u64],
         updated_entries: &[proto::Entry],
-    ) -> tide::Result<Vec<ConnectionId>> {
+    ) -> Result<Vec<ConnectionId>> {
         let project = self.write_project(project_id, connection_id)?;
         let worktree = project
             .share_mut()?
@@ -554,13 +554,13 @@ impl Store {
         &self,
         project_id: u64,
         acting_connection_id: ConnectionId,
-    ) -> tide::Result<Vec<ConnectionId>> {
+    ) -> Result<Vec<ConnectionId>> {
         Ok(self
             .read_project(project_id, acting_connection_id)?
             .connection_ids())
     }
 
-    pub fn channel_connection_ids(&self, channel_id: ChannelId) -> tide::Result<Vec<ConnectionId>> {
+    pub fn channel_connection_ids(&self, channel_id: ChannelId) -> Result<Vec<ConnectionId>> {
         Ok(self
             .channels
             .get(&channel_id)
@@ -573,11 +573,7 @@ impl Store {
         self.projects.get(&project_id)
     }
 
-    pub fn read_project(
-        &self,
-        project_id: u64,
-        connection_id: ConnectionId,
-    ) -> tide::Result<&Project> {
+    pub fn read_project(&self, project_id: u64, connection_id: ConnectionId) -> Result<&Project> {
         let project = self
             .projects
             .get(&project_id)
@@ -600,7 +596,7 @@ impl Store {
         &mut self,
         project_id: u64,
         connection_id: ConnectionId,
-    ) -> tide::Result<&mut Project> {
+    ) -> Result<&mut Project> {
         let project = self
             .projects
             .get_mut(&project_id)
@@ -755,14 +751,14 @@ impl Project {
         }
     }
 
-    pub fn share(&self) -> tide::Result<&ProjectShare> {
+    pub fn share(&self) -> Result<&ProjectShare> {
         Ok(self
             .share
             .as_ref()
             .ok_or_else(|| anyhow!("worktree is not shared"))?)
     }
 
-    fn share_mut(&mut self) -> tide::Result<&mut ProjectShare> {
+    fn share_mut(&mut self) -> Result<&mut ProjectShare> {
         Ok(self
             .share
             .as_mut()

crates/rpc/src/conn.rs 🔗

@@ -1,14 +1,14 @@
-use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
+use async_tungstenite::tungstenite::Message as WebSocketMessage;
 use futures::{SinkExt as _, StreamExt as _};
 
 pub struct Connection {
     pub(crate) tx:
-        Box<dyn 'static + Send + Unpin + futures::Sink<WebSocketMessage, Error = WebSocketError>>,
+        Box<dyn 'static + Send + Unpin + futures::Sink<WebSocketMessage, Error = anyhow::Error>>,
     pub(crate) rx: Box<
         dyn 'static
             + Send
             + Unpin
-            + futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>,
+            + futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>>,
     >,
 }
 
@@ -18,8 +18,8 @@ impl Connection {
         S: 'static
             + Send
             + Unpin
-            + futures::Sink<WebSocketMessage, Error = WebSocketError>
-            + futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>,
+            + futures::Sink<WebSocketMessage, Error = anyhow::Error>
+            + futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>>,
     {
         let (tx, rx) = stream.split();
         Self {
@@ -28,7 +28,7 @@ impl Connection {
         }
     }
 
-    pub async fn send(&mut self, message: WebSocketMessage) -> Result<(), WebSocketError> {
+    pub async fn send(&mut self, message: WebSocketMessage) -> Result<(), anyhow::Error> {
         self.tx.send(message).await
     }
 
@@ -54,40 +54,37 @@ impl Connection {
             killed: Arc<AtomicBool>,
             executor: Arc<gpui::executor::Background>,
         ) -> (
-            Box<dyn Send + Unpin + futures::Sink<WebSocketMessage, Error = WebSocketError>>,
-            Box<
-                dyn Send + Unpin + futures::Stream<Item = Result<WebSocketMessage, WebSocketError>>,
-            >,
+            Box<dyn Send + Unpin + futures::Sink<WebSocketMessage, Error = anyhow::Error>>,
+            Box<dyn Send + Unpin + futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>>>,
         ) {
+            use anyhow::anyhow;
             use futures::channel::mpsc;
             use std::io::{Error, ErrorKind};
 
             let (tx, rx) = mpsc::unbounded::<WebSocketMessage>();
 
-            let tx = tx
-                .sink_map_err(|e| WebSocketError::from(Error::new(ErrorKind::Other, e)))
-                .with({
+            let tx = tx.sink_map_err(|error| anyhow!(error)).with({
+                let killed = killed.clone();
+                let executor = Arc::downgrade(&executor);
+                move |msg| {
                     let killed = killed.clone();
-                    let executor = Arc::downgrade(&executor);
-                    move |msg| {
-                        let killed = killed.clone();
-                        let executor = executor.clone();
-                        Box::pin(async move {
-                            if let Some(executor) = executor.upgrade() {
-                                executor.simulate_random_delay().await;
-                            }
+                    let executor = executor.clone();
+                    Box::pin(async move {
+                        if let Some(executor) = executor.upgrade() {
+                            executor.simulate_random_delay().await;
+                        }
 
-                            // Writes to a half-open TCP connection will error.
-                            if killed.load(SeqCst) {
-                                std::io::Result::Err(
-                                    Error::new(ErrorKind::Other, "connection lost").into(),
-                                )?;
-                            }
+                        // Writes to a half-open TCP connection will error.
+                        if killed.load(SeqCst) {
+                            std::io::Result::Err(
+                                Error::new(ErrorKind::Other, "connection lost").into(),
+                            )?;
+                        }
 
-                            Ok(msg)
-                        })
-                    }
-                });
+                        Ok(msg)
+                    })
+                }
+            });
 
             let rx = rx.then({
                 let killed = killed.clone();

crates/rpc/src/proto.rs 🔗

@@ -1,6 +1,6 @@
 use super::{ConnectionId, PeerId, TypedEnvelope};
-use anyhow::Result;
-use async_tungstenite::tungstenite::{Error as WebSocketError, Message as WebSocketMessage};
+use anyhow::{anyhow, Result};
+use async_tungstenite::tungstenite::Message as WebSocketMessage;
 use futures::{SinkExt as _, StreamExt as _};
 use prost::Message as _;
 use std::any::{Any, TypeId};
@@ -318,9 +318,9 @@ impl<S> MessageStream<S> {
 
 impl<S> MessageStream<S>
 where
-    S: futures::Sink<WebSocketMessage, Error = WebSocketError> + Unpin,
+    S: futures::Sink<WebSocketMessage, Error = anyhow::Error> + Unpin,
 {
-    pub async fn write(&mut self, message: Message) -> Result<(), WebSocketError> {
+    pub async fn write(&mut self, message: Message) -> Result<(), anyhow::Error> {
         #[cfg(any(test, feature = "test-support"))]
         const COMPRESSION_LEVEL: i32 = -7;
 
@@ -357,9 +357,9 @@ where
 
 impl<S> MessageStream<S>
 where
-    S: futures::Stream<Item = Result<WebSocketMessage, WebSocketError>> + Unpin,
+    S: futures::Stream<Item = Result<WebSocketMessage, anyhow::Error>> + Unpin,
 {
-    pub async fn read(&mut self) -> Result<Message, WebSocketError> {
+    pub async fn read(&mut self) -> Result<Message, anyhow::Error> {
         while let Some(bytes) = self.stream.next().await {
             match bytes? {
                 WebSocketMessage::Binary(bytes) => {
@@ -375,7 +375,7 @@ where
                 _ => {}
             }
         }
-        Err(WebSocketError::ConnectionClosed)
+        Err(anyhow!("connection closed"))
     }
 }