diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 57656c7b5c8e24e459e233e19da8e7a03cae9f25..43dc2d21806d4e23cbe3f332ca90d87b21091252 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -194,6 +194,21 @@ impl Peer { return Ok(()) }, }, + _ = keepalive_timer => { + tracing::debug!(%connection_id, "keepalive interval: pinging"); + futures::select_biased! { + result = writer.write(proto::Message::Ping).fuse() => { + tracing::debug!(%connection_id, "keepalive interval: done pinging"); + result.context("failed to send keepalive")?; + tracing::debug!(%connection_id, "keepalive interval: resetting after pinging"); + keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); + } + _ = create_timer(WRITE_TIMEOUT).fuse() => { + tracing::debug!(%connection_id, "keepalive interval: pinging timed out"); + Err(anyhow!("timed out sending keepalive"))?; + } + } + } incoming = read_message => { let incoming = incoming.context("error reading rpc message from socket")?; tracing::debug!(%connection_id, "incoming rpc message: received"); @@ -219,21 +234,6 @@ impl Peer { } break; }, - _ = keepalive_timer => { - tracing::debug!(%connection_id, "keepalive interval: pinging"); - futures::select_biased! { - result = writer.write(proto::Message::Ping).fuse() => { - tracing::debug!(%connection_id, "keepalive interval: done pinging"); - result.context("failed to send keepalive")?; - tracing::debug!(%connection_id, "keepalive interval: resetting after pinging"); - keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); - } - _ = create_timer(WRITE_TIMEOUT).fuse() => { - tracing::debug!(%connection_id, "keepalive interval: pinging timed out"); - Err(anyhow!("timed out sending keepalive"))?; - } - } - } _ = receive_timeout => { tracing::debug!(%connection_id, "receive timeout: delay between messages too long"); Err(anyhow!("delay between messages too long"))?