diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 8f1d66e47a4b8fd92f02e2aa2c1318505af6e4a3..4156c0883d9c897ab2dc64a2d9df8c958cfa5a96 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -96,6 +96,7 @@ pub struct ConnectionState { const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1); const WRITE_TIMEOUT: Duration = Duration::from_secs(2); +const RECEIVE_TIMEOUT: Duration = Duration::from_secs(30); impl Peer { pub fn new() -> Arc { @@ -147,14 +148,14 @@ impl Peer { let keepalive_timer = create_timer(KEEPALIVE_INTERVAL).fuse(); futures::pin_mut!(keepalive_timer); + // Disconnect if we don't receive messages at least this frequently. + let receive_timeout = create_timer(RECEIVE_TIMEOUT).fuse(); + futures::pin_mut!(receive_timeout); + loop { let read_message = reader.read().fuse(); futures::pin_mut!(read_message); - // Disconnect if we don't receive messages at least this frequently. - let receive_timeout = create_timer(3 * KEEPALIVE_INTERVAL).fuse(); - futures::pin_mut!(receive_timeout); - loop { futures::select_biased! { outgoing = outgoing_rx.next().fuse() => match outgoing { @@ -170,6 +171,7 @@ impl Peer { }, incoming = read_message => { let incoming = incoming.context("received invalid RPC message")?; + receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse()); if let proto::Message::Envelope(incoming) = incoming { if incoming_tx.send(incoming).await.is_err() { return Ok(());