Reset receive timeout only on reads from websocket connection, not writes

Max Brunsfeld and Keith Simmons created

Also, increase the receive timeout to 30 seconds. We'll still respond immediately
to explicit disconnection, but when there are temporary network blips that
delay pings, we think we should err on the side of keeping the connection
alive. This is in response to a false positive 'host disconnected' state
that we observed when pairing today, while the host (Keith) still clearly
had a working internet connection, because we were screen sharing.

Co-Authored-By: Keith Simmons <keith@zed.dev>

Change summary

crates/rpc/src/peer.rs | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)

Detailed changes

crates/rpc/src/peer.rs 🔗

@@ -96,6 +96,7 @@ pub struct ConnectionState {
 
 const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
 const WRITE_TIMEOUT: Duration = Duration::from_secs(2);
+const RECEIVE_TIMEOUT: Duration = Duration::from_secs(30);
 
 impl Peer {
     pub fn new() -> Arc<Self> {
@@ -147,14 +148,14 @@ impl Peer {
             let keepalive_timer = create_timer(KEEPALIVE_INTERVAL).fuse();
             futures::pin_mut!(keepalive_timer);
 
+            // Disconnect if we don't receive messages at least this frequently.
+            let receive_timeout = create_timer(RECEIVE_TIMEOUT).fuse();
+            futures::pin_mut!(receive_timeout);
+
             loop {
                 let read_message = reader.read().fuse();
                 futures::pin_mut!(read_message);
 
-                // Disconnect if we don't receive messages at least this frequently.
-                let receive_timeout = create_timer(3 * KEEPALIVE_INTERVAL).fuse();
-                futures::pin_mut!(receive_timeout);
-
                 loop {
                     futures::select_biased! {
                         outgoing = outgoing_rx.next().fuse() => match outgoing {
@@ -170,6 +171,7 @@ impl Peer {
                         },
                         incoming = read_message => {
                             let incoming = incoming.context("received invalid RPC message")?;
+                            receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse());
                             if let proto::Message::Envelope(incoming) = incoming {
                                 if incoming_tx.send(incoming).await.is_err() {
                                     return Ok(());