Merge pull request #1227 from zed-industries/maintain-keepalive-under-high-message-volume

Antonio Scandurra created

Maintain keepalive under high message volume

Change summary

Cargo.lock                  |  2 ++
crates/gpui/src/executor.rs |  5 +----
crates/rpc/Cargo.toml       |  2 ++
crates/rpc/src/peer.rs      | 37 ++++++++++++++++++++++---------------
4 files changed, 27 insertions(+), 19 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -3826,6 +3826,8 @@ dependencies = [
  "base64 0.13.0",
  "clock",
  "collections",
+ "ctor",
+ "env_logger",
  "futures",
  "gpui",
  "parking_lot 0.11.2",

crates/gpui/src/executor.rs 🔗

@@ -532,10 +532,7 @@ impl Foreground {
     #[cfg(any(test, feature = "test-support"))]
     pub fn advance_clock(&self, duration: Duration) {
         match self {
-            Self::Deterministic { executor, .. } => {
-                executor.run_until_parked();
-                executor.advance_clock(duration);
-            }
+            Self::Deterministic { executor, .. } => executor.advance_clock(duration),
             _ => panic!("this method can only be called on a deterministic executor"),
         }
     }

crates/rpc/Cargo.toml 🔗

@@ -38,3 +38,5 @@ collections = { path = "../collections", features = ["test-support"] }
 gpui = { path = "../gpui", features = ["test-support"] }
 smol = "1.2.5"
 tempdir = "0.3.7"
+ctor = "0.1"
+env_logger = "0.9"

crates/rpc/src/peer.rs 🔗

@@ -194,6 +194,21 @@ impl Peer {
                                 return Ok(())
                             },
                         },
+                        _ = keepalive_timer => {
+                            tracing::debug!(%connection_id, "keepalive interval: pinging");
+                            futures::select_biased! {
+                                result = writer.write(proto::Message::Ping).fuse() => {
+                                    tracing::debug!(%connection_id, "keepalive interval: done pinging");
+                                    result.context("failed to send keepalive")?;
+                                    tracing::debug!(%connection_id, "keepalive interval: resetting after pinging");
+                                    keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
+                                }
+                                _ = create_timer(WRITE_TIMEOUT).fuse() => {
+                                    tracing::debug!(%connection_id, "keepalive interval: pinging timed out");
+                                    Err(anyhow!("timed out sending keepalive"))?;
+                                }
+                            }
+                        }
                         incoming = read_message => {
                             let incoming = incoming.context("error reading rpc message from socket")?;
                             tracing::debug!(%connection_id, "incoming rpc message: received");
@@ -219,21 +234,6 @@ impl Peer {
                             }
                             break;
                         },
-                        _ = keepalive_timer => {
-                            tracing::debug!(%connection_id, "keepalive interval: pinging");
-                            futures::select_biased! {
-                                result = writer.write(proto::Message::Ping).fuse() => {
-                                    tracing::debug!(%connection_id, "keepalive interval: done pinging");
-                                    result.context("failed to send keepalive")?;
-                                    tracing::debug!(%connection_id, "keepalive interval: resetting after pinging");
-                                    keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
-                                }
-                                _ = create_timer(WRITE_TIMEOUT).fuse() => {
-                                    tracing::debug!(%connection_id, "keepalive interval: pinging timed out");
-                                    Err(anyhow!("timed out sending keepalive"))?;
-                                }
-                            }
-                        }
                         _ = receive_timeout => {
                             tracing::debug!(%connection_id, "receive timeout: delay between messages too long");
                             Err(anyhow!("delay between messages too long"))?
@@ -499,6 +499,13 @@ mod tests {
     use async_tungstenite::tungstenite::Message as WebSocketMessage;
     use gpui::TestAppContext;
 
+    #[ctor::ctor]
+    fn init_logger() {
+        if std::env::var("RUST_LOG").is_ok() {
+            env_logger::init();
+        }
+    }
+
     #[gpui::test(iterations = 50)]
     async fn test_request_response(cx: &mut TestAppContext) {
         let executor = cx.foreground();