Establish WebSocket connection to Cloud (#35734)

Richard Feldman and Marshall Bowers created

This PR adds a new WebSocket connection to Cloud.

This connection will be used to push down notifications from the server
to the client.

Release Notes:

- N/A

---------

Co-authored-by: Marshall Bowers <git@maxdeviant.com>

Change summary

Cargo.lock                                       | 48 ++++++++++
Cargo.toml                                       |  3 
crates/client/src/client.rs                      | 35 ++++++++
crates/cloud_api_client/Cargo.toml               |  3 
crates/cloud_api_client/src/cloud_api_client.rs  | 43 ++++++++++
crates/cloud_api_client/src/websocket.rs         | 73 ++++++++++++++++++
crates/cloud_api_types/src/websocket_protocol.rs |  2 
tooling/workspace-hack/Cargo.toml                | 24 ++--
8 files changed, 214 insertions(+), 17 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1411,7 +1411,7 @@ dependencies = [
  "anyhow",
  "arrayvec",
  "log",
- "nom",
+ "nom 7.1.3",
  "num-rational",
  "v_frame",
 ]
@@ -2785,7 +2785,7 @@ version = "0.6.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
 dependencies = [
- "nom",
+ "nom 7.1.3",
 ]
 
 [[package]]
@@ -3071,10 +3071,13 @@ dependencies = [
  "anyhow",
  "cloud_api_types",
  "futures 0.3.31",
+ "gpui",
+ "gpui_tokio",
  "http_client",
  "parking_lot",
  "serde_json",
  "workspace-hack",
+ "yawc",
 ]
 
 [[package]]
@@ -10582,6 +10585,15 @@ dependencies = [
  "minimal-lexical",
 ]
 
+[[package]]
+name = "nom"
+version = "8.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405"
+dependencies = [
+ "memchr",
+]
+
 [[package]]
 name = "noop_proc_macro"
 version = "0.3.0"
@@ -15403,7 +15415,7 @@ version = "0.2.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "7bba3a93db0cc4f7bdece8bb09e77e2e785c20bfebf79eb8340ed80708048790"
 dependencies = [
- "nom",
+ "nom 7.1.3",
  "unicode_categories",
 ]
 
@@ -19979,7 +19991,7 @@ dependencies = [
  "naga",
  "nix 0.28.0",
  "nix 0.29.0",
- "nom",
+ "nom 7.1.3",
  "num-bigint",
  "num-bigint-dig",
  "num-integer",
@@ -20314,6 +20326,34 @@ version = "1.0.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049"
 
+[[package]]
+name = "yawc"
+version = "0.2.4"
+source = "git+https://github.com/deviant-forks/yawc?rev=1899688f3e69ace4545aceb97b2a13881cf26142#1899688f3e69ace4545aceb97b2a13881cf26142"
+dependencies = [
+ "base64 0.22.1",
+ "bytes 1.10.1",
+ "flate2",
+ "futures 0.3.31",
+ "http-body-util",
+ "hyper 1.6.0",
+ "hyper-util",
+ "js-sys",
+ "nom 8.0.0",
+ "pin-project",
+ "rand 0.8.5",
+ "sha1",
+ "thiserror 1.0.69",
+ "tokio",
+ "tokio-rustls 0.26.2",
+ "tokio-util",
+ "url",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+ "webpki-roots",
+]
+
 [[package]]
 name = "yazi"
 version = "0.2.1"

Cargo.toml 🔗

@@ -661,6 +661,9 @@ which = "6.0.0"
 windows-core = "0.61"
 wit-component = "0.221"
 workspace-hack = "0.1.0"
+# We can switch back to the published version once https://github.com/infinitefield/yawc/pull/16 is merged and a new
+# version is released.
+yawc = { git = "https://github.com/deviant-forks/yawc", rev = "1899688f3e69ace4545aceb97b2a13881cf26142" }
 zstd = "0.11"
 
 [workspace.dependencies.async-stripe]

crates/client/src/client.rs 🔗

@@ -14,6 +14,7 @@ use async_tungstenite::tungstenite::{
 };
 use clock::SystemClock;
 use cloud_api_client::CloudApiClient;
+use cloud_api_client::websocket_protocol::MessageToClient;
 use credentials_provider::CredentialsProvider;
 use futures::{
     AsyncReadExt, FutureExt, SinkExt, Stream, StreamExt, TryFutureExt as _, TryStreamExt,
@@ -933,6 +934,32 @@ impl Client {
         }
     }
 
+    /// Establishes a WebSocket connection with Cloud for receiving updates from the server.
+    async fn connect_to_cloud(self: &Arc<Self>, cx: &AsyncApp) -> Result<()> {
+        let connect_task = cx.update({
+            let cloud_client = self.cloud_client.clone();
+            move |cx| cloud_client.connect(cx)
+        })??;
+        let connection = connect_task.await?;
+
+        let (mut messages, task) = cx.update(|cx| connection.spawn(cx))?;
+        task.detach();
+
+        cx.spawn({
+            let this = self.clone();
+            async move |cx| {
+                while let Some(message) = messages.next().await {
+                    if let Some(message) = message.log_err() {
+                        this.handle_message_to_client(message, cx);
+                    }
+                }
+            }
+        })
+        .detach();
+
+        Ok(())
+    }
+
     /// Performs a sign-in and also connects to Collab.
     ///
     /// This is called in places where we *don't* need to connect in the future. We will replace these calls with calls
@@ -944,6 +971,8 @@ impl Client {
     ) -> Result<()> {
         let credentials = self.sign_in(try_provider, cx).await?;
 
+        self.connect_to_cloud(cx).await.log_err();
+
         let connect_result = match self.connect_with_credentials(credentials, cx).await {
             ConnectionResult::Timeout => Err(anyhow!("connection timed out")),
             ConnectionResult::ConnectionReset => Err(anyhow!("connection reset")),
@@ -1622,6 +1651,12 @@ impl Client {
         }
     }
 
+    fn handle_message_to_client(self: &Arc<Client>, message: MessageToClient, _cx: &AsyncApp) {
+        match message {
+            MessageToClient::UserUpdated => {}
+        }
+    }
+
     pub fn telemetry(&self) -> &Arc<Telemetry> {
         &self.telemetry
     }

crates/cloud_api_client/Cargo.toml 🔗

@@ -15,7 +15,10 @@ path = "src/cloud_api_client.rs"
 anyhow.workspace = true
 cloud_api_types.workspace = true
 futures.workspace = true
+gpui.workspace = true
+gpui_tokio.workspace = true
 http_client.workspace = true
 parking_lot.workspace = true
 serde_json.workspace = true
 workspace-hack.workspace = true
+yawc.workspace = true

crates/cloud_api_client/src/cloud_api_client.rs 🔗

@@ -1,11 +1,19 @@
+mod websocket;
+
 use std::sync::Arc;
 
 use anyhow::{Context, Result, anyhow};
+use cloud_api_types::websocket_protocol::{PROTOCOL_VERSION, PROTOCOL_VERSION_HEADER_NAME};
 pub use cloud_api_types::*;
 use futures::AsyncReadExt as _;
+use gpui::{App, Task};
+use gpui_tokio::Tokio;
 use http_client::http::request;
 use http_client::{AsyncBody, HttpClientWithUrl, Method, Request, StatusCode};
 use parking_lot::RwLock;
+use yawc::WebSocket;
+
+use crate::websocket::Connection;
 
 struct Credentials {
     user_id: u32,
@@ -78,6 +86,41 @@ impl CloudApiClient {
         Ok(serde_json::from_str(&body)?)
     }
 
+    pub fn connect(&self, cx: &App) -> Result<Task<Result<Connection>>> {
+        let mut connect_url = self
+            .http_client
+            .build_zed_cloud_url("/client/users/connect", &[])?;
+        connect_url
+            .set_scheme(match connect_url.scheme() {
+                "https" => "wss",
+                "http" => "ws",
+                scheme => Err(anyhow!("invalid URL scheme: {scheme}"))?,
+            })
+            .map_err(|_| anyhow!("failed to set URL scheme"))?;
+
+        let credentials = self.credentials.read();
+        let credentials = credentials.as_ref().context("no credentials provided")?;
+        let authorization_header = format!("{} {}", credentials.user_id, credentials.access_token);
+
+        Ok(cx.spawn(async move |cx| {
+            let handle = cx
+                .update(|cx| Tokio::handle(cx))
+                .ok()
+                .context("failed to get Tokio handle")?;
+            let _guard = handle.enter();
+
+            let ws = WebSocket::connect(connect_url)
+                .with_request(
+                    request::Builder::new()
+                        .header("Authorization", authorization_header)
+                        .header(PROTOCOL_VERSION_HEADER_NAME, PROTOCOL_VERSION.to_string()),
+                )
+                .await?;
+
+            Ok(Connection::new(ws))
+        }))
+    }
+
     pub async fn accept_terms_of_service(&self) -> Result<AcceptTermsOfServiceResponse> {
         let request = self.build_request(
             Request::builder().method(Method::POST).uri(

crates/cloud_api_client/src/websocket.rs 🔗

@@ -0,0 +1,73 @@
+use std::pin::Pin;
+use std::time::Duration;
+
+use anyhow::Result;
+use cloud_api_types::websocket_protocol::MessageToClient;
+use futures::channel::mpsc::unbounded;
+use futures::stream::{SplitSink, SplitStream};
+use futures::{FutureExt as _, SinkExt as _, Stream, StreamExt as _, TryStreamExt as _, pin_mut};
+use gpui::{App, BackgroundExecutor, Task};
+use yawc::WebSocket;
+use yawc::frame::{FrameView, OpCode};
+
+const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
+
+pub type MessageStream = Pin<Box<dyn Stream<Item = Result<MessageToClient>>>>;
+
+pub struct Connection {
+    tx: SplitSink<WebSocket, FrameView>,
+    rx: SplitStream<WebSocket>,
+}
+
+impl Connection {
+    pub fn new(ws: WebSocket) -> Self {
+        let (tx, rx) = ws.split();
+
+        Self { tx, rx }
+    }
+
+    pub fn spawn(self, cx: &App) -> (MessageStream, Task<()>) {
+        let (mut tx, rx) = (self.tx, self.rx);
+
+        let (message_tx, message_rx) = unbounded();
+
+        let handle_io = |executor: BackgroundExecutor| async move {
+            // Send messages on this frequency so the connection isn't closed.
+            let keepalive_timer = executor.timer(KEEPALIVE_INTERVAL).fuse();
+            futures::pin_mut!(keepalive_timer);
+
+            let rx = rx.fuse();
+            pin_mut!(rx);
+
+            loop {
+                futures::select_biased! {
+                    _ = keepalive_timer => {
+                        let _ = tx.send(FrameView::ping(Vec::new())).await;
+
+                        keepalive_timer.set(executor.timer(KEEPALIVE_INTERVAL).fuse());
+                    }
+                    frame = rx.next() => {
+                        let Some(frame) = frame else {
+                            break;
+                        };
+
+                        match frame.opcode {
+                            OpCode::Binary => {
+                                let message_result = MessageToClient::deserialize(&frame.payload);
+                                message_tx.unbounded_send(message_result).ok();
+                            }
+                            OpCode::Close => {
+                                break;
+                            }
+                            _ => {}
+                        }
+                    }
+                }
+            }
+        };
+
+        let task = cx.spawn(async move |cx| handle_io(cx.background_executor().clone()).await);
+
+        (message_rx.into_stream().boxed(), task)
+    }
+}

crates/cloud_api_types/src/websocket_protocol.rs 🔗

@@ -8,7 +8,7 @@ pub const PROTOCOL_VERSION: u32 = 0;
 pub const PROTOCOL_VERSION_HEADER_NAME: &str = "x-zed-protocol-version";
 
 /// A message from Cloud to the Zed client.
-#[derive(Serialize, Deserialize)]
+#[derive(Debug, Serialize, Deserialize)]
 pub enum MessageToClient {
     /// The user was updated and should be refreshed.
     UserUpdated,

tooling/workspace-hack/Cargo.toml 🔗

@@ -305,7 +305,7 @@ scopeguard = { version = "1" }
 security-framework = { version = "3", features = ["OSX_10_14"] }
 security-framework-sys = { version = "2", features = ["OSX_10_14"] }
 sync_wrapper = { version = "1", default-features = false, features = ["futures"] }
-tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
+tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] }
 tokio-socks = { version = "0.5", features = ["futures-io"] }
 tokio-stream = { version = "0.1", features = ["fs"] }
 tower = { version = "0.5", default-features = false, features = ["timeout", "util"] }
@@ -334,7 +334,7 @@ scopeguard = { version = "1" }
 security-framework = { version = "3", features = ["OSX_10_14"] }
 security-framework-sys = { version = "2", features = ["OSX_10_14"] }
 sync_wrapper = { version = "1", default-features = false, features = ["futures"] }
-tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
+tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] }
 tokio-socks = { version = "0.5", features = ["futures-io"] }
 tokio-stream = { version = "0.1", features = ["fs"] }
 tower = { version = "0.5", default-features = false, features = ["timeout", "util"] }
@@ -362,7 +362,7 @@ scopeguard = { version = "1" }
 security-framework = { version = "3", features = ["OSX_10_14"] }
 security-framework-sys = { version = "2", features = ["OSX_10_14"] }
 sync_wrapper = { version = "1", default-features = false, features = ["futures"] }
-tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
+tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] }
 tokio-socks = { version = "0.5", features = ["futures-io"] }
 tokio-stream = { version = "0.1", features = ["fs"] }
 tower = { version = "0.5", default-features = false, features = ["timeout", "util"] }
@@ -391,7 +391,7 @@ scopeguard = { version = "1" }
 security-framework = { version = "3", features = ["OSX_10_14"] }
 security-framework-sys = { version = "2", features = ["OSX_10_14"] }
 sync_wrapper = { version = "1", default-features = false, features = ["futures"] }
-tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
+tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] }
 tokio-socks = { version = "0.5", features = ["futures-io"] }
 tokio-stream = { version = "0.1", features = ["fs"] }
 tower = { version = "0.5", default-features = false, features = ["timeout", "util"] }
@@ -429,7 +429,7 @@ rustix-dff4ba8e3ae991db = { package = "rustix", version = "1", features = ["fs",
 scopeguard = { version = "1" }
 syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] }
 sync_wrapper = { version = "1", default-features = false, features = ["futures"] }
-tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
+tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] }
 tokio-socks = { version = "0.5", features = ["futures-io"] }
 tokio-stream = { version = "0.1", features = ["fs"] }
 toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
@@ -468,7 +468,7 @@ rustix-d585fab2519d2d1 = { package = "rustix", version = "0.38", features = ["ev
 rustix-dff4ba8e3ae991db = { package = "rustix", version = "1", features = ["fs", "net", "process", "termios", "time"] }
 scopeguard = { version = "1" }
 sync_wrapper = { version = "1", default-features = false, features = ["futures"] }
-tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
+tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] }
 tokio-socks = { version = "0.5", features = ["futures-io"] }
 tokio-stream = { version = "0.1", features = ["fs"] }
 toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
@@ -509,7 +509,7 @@ rustix-dff4ba8e3ae991db = { package = "rustix", version = "1", features = ["fs",
 scopeguard = { version = "1" }
 syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] }
 sync_wrapper = { version = "1", default-features = false, features = ["futures"] }
-tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
+tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] }
 tokio-socks = { version = "0.5", features = ["futures-io"] }
 tokio-stream = { version = "0.1", features = ["fs"] }
 toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
@@ -548,7 +548,7 @@ rustix-d585fab2519d2d1 = { package = "rustix", version = "0.38", features = ["ev
 rustix-dff4ba8e3ae991db = { package = "rustix", version = "1", features = ["fs", "net", "process", "termios", "time"] }
 scopeguard = { version = "1" }
 sync_wrapper = { version = "1", default-features = false, features = ["futures"] }
-tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
+tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] }
 tokio-socks = { version = "0.5", features = ["futures-io"] }
 tokio-stream = { version = "0.1", features = ["fs"] }
 toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
@@ -568,7 +568,7 @@ ring = { version = "0.17", features = ["std"] }
 rustix-d585fab2519d2d1 = { package = "rustix", version = "0.38", features = ["event"] }
 scopeguard = { version = "1" }
 sync_wrapper = { version = "1", default-features = false, features = ["futures"] }
-tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
+tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] }
 tokio-socks = { version = "0.5", features = ["futures-io"] }
 tokio-stream = { version = "0.1", features = ["fs"] }
 tower = { version = "0.5", default-features = false, features = ["timeout", "util"] }
@@ -592,7 +592,7 @@ ring = { version = "0.17", features = ["std"] }
 rustix-d585fab2519d2d1 = { package = "rustix", version = "0.38", features = ["event"] }
 scopeguard = { version = "1" }
 sync_wrapper = { version = "1", default-features = false, features = ["futures"] }
-tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
+tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] }
 tokio-socks = { version = "0.5", features = ["futures-io"] }
 tokio-stream = { version = "0.1", features = ["fs"] }
 tower = { version = "0.5", default-features = false, features = ["timeout", "util"] }
@@ -636,7 +636,7 @@ rustix-dff4ba8e3ae991db = { package = "rustix", version = "1", features = ["fs",
 scopeguard = { version = "1" }
 syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] }
 sync_wrapper = { version = "1", default-features = false, features = ["futures"] }
-tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
+tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] }
 tokio-socks = { version = "0.5", features = ["futures-io"] }
 tokio-stream = { version = "0.1", features = ["fs"] }
 toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
@@ -675,7 +675,7 @@ rustix-d585fab2519d2d1 = { package = "rustix", version = "0.38", features = ["ev
 rustix-dff4ba8e3ae991db = { package = "rustix", version = "1", features = ["fs", "net", "process", "termios", "time"] }
 scopeguard = { version = "1" }
 sync_wrapper = { version = "1", default-features = false, features = ["futures"] }
-tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
+tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "ring"] }
 tokio-socks = { version = "0.5", features = ["futures-io"] }
 tokio-stream = { version = "0.1", features = ["fs"] }
 toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }