Reset peer's receive timeout when a message is received

Max Brunsfeld , Keith Simmons , and Nathan Sobo created

* Make advance_clock more realistic by waking timers in order,
  instead of all at once.
* Don't advance the clock when simulating random delays.

Co-Authored-By: Keith Simmons <keith@zed.dev>
Co-Authored-By: Nathan Sobo <nathan@zed.dev>

Change summary

crates/client/src/client.rs |  2 -
crates/gpui/src/executor.rs | 37 ++++++++++++++++++------
crates/rpc/src/peer.rs      | 57 ++++++++++++++++++++++++++------------
crates/rpc/src/proto.rs     |  1 
crates/server/src/rpc.rs    |  2 -
5 files changed, 67 insertions(+), 32 deletions(-)

Detailed changes

crates/client/src/client.rs 🔗

@@ -966,8 +966,6 @@ mod tests {
         server.roll_access_token();
         server.allow_connections();
         cx.foreground().advance_clock(Duration::from_secs(10));
-        assert_eq!(server.auth_count(), 1);
-        cx.foreground().advance_clock(Duration::from_secs(10));
         while !matches!(status.next().await, Some(Status::Connected { .. })) {}
         assert_eq!(server.auth_count(), 2); // Client re-authenticated due to an invalid token
     }

crates/gpui/src/executor.rs 🔗

@@ -330,14 +330,34 @@ impl Deterministic {
     }
 
     pub fn advance_clock(&self, duration: Duration) {
-        let mut state = self.state.lock();
-        state.now += duration;
-        let now = state.now;
-        let mut pending_timers = mem::take(&mut state.pending_timers);
-        drop(state);
+        let new_now = self.state.lock().now + duration;
+        loop {
+            self.run_until_parked();
+            let mut state = self.state.lock();
+
+            if let Some((_, wakeup_time, _)) = state.pending_timers.first() {
+                let wakeup_time = *wakeup_time;
+                if wakeup_time < new_now {
+                    let timer_count = state
+                        .pending_timers
+                        .iter()
+                        .take_while(|(_, t, _)| *t == wakeup_time)
+                        .count();
+                    state.now = wakeup_time;
+                    let timers_to_wake = state
+                        .pending_timers
+                        .drain(0..timer_count)
+                        .collect::<Vec<_>>();
+                    drop(state);
+                    drop(timers_to_wake);
+                    continue;
+                }
+            }
 
-        pending_timers.retain(|(_, wakeup, _)| *wakeup > now);
-        self.state.lock().pending_timers.extend(pending_timers);
+            break;
+        }
+
+        self.state.lock().now = new_now;
     }
 }
 
@@ -640,9 +660,6 @@ impl Background {
                     for _ in 0..yields {
                         yield_now().await;
                     }
-
-                    let delay = Duration::from_millis(executor.state.lock().rng.gen_range(0..100));
-                    executor.advance_clock(delay);
                 }
             }
             _ => panic!("this method can only be called on a deterministic executor"),

crates/rpc/src/peer.rs 🔗

@@ -88,13 +88,14 @@ pub struct Peer {
 
 #[derive(Clone)]
 pub struct ConnectionState {
-    outgoing_tx: futures::channel::mpsc::UnboundedSender<proto::Envelope>,
+    outgoing_tx: futures::channel::mpsc::UnboundedSender<proto::Message>,
     next_message_id: Arc<AtomicU32>,
     response_channels:
         Arc<Mutex<Option<HashMap<u32, oneshot::Sender<(proto::Envelope, barrier::Sender)>>>>>,
 }
 
 const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
+const WRITE_TIMEOUT: Duration = Duration::from_secs(2);
 
 impl Peer {
     pub fn new() -> Arc<Self> {
@@ -142,19 +143,25 @@ impl Peer {
                 this.connections.write().remove(&connection_id);
             });
 
+            // Send messages on this frequency so the connection isn't closed.
+            let keepalive_timer = create_timer(KEEPALIVE_INTERVAL).fuse();
+            futures::pin_mut!(keepalive_timer);
+
             loop {
                 let read_message = reader.read().fuse();
                 futures::pin_mut!(read_message);
-                let read_timeout = create_timer(2 * KEEPALIVE_INTERVAL).fuse();
-                futures::pin_mut!(read_timeout);
+
+                // Disconnect if we don't receive messages at least this frequently.
+                let receive_timeout = create_timer(3 * KEEPALIVE_INTERVAL).fuse();
+                futures::pin_mut!(receive_timeout);
 
                 loop {
                     futures::select_biased! {
                         outgoing = outgoing_rx.next().fuse() => match outgoing {
                             Some(outgoing) => {
-                                let outgoing = proto::Message::Envelope(outgoing);
-                                if let Some(result) = writer.write(outgoing).timeout(2 * KEEPALIVE_INTERVAL).await {
+                                if let Some(result) = writer.write(outgoing).timeout(WRITE_TIMEOUT).await {
                                     result.context("failed to write RPC message")?;
+                                    keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
                                 } else {
                                     Err(anyhow!("timed out writing message"))?;
                                 }
@@ -168,18 +175,18 @@ impl Peer {
                                     return Ok(());
                                 }
                             }
-
                             break;
                         },
-                        _ = create_timer(KEEPALIVE_INTERVAL).fuse() => {
-                            if let Some(result) = writer.write(proto::Message::Ping).timeout(2 * KEEPALIVE_INTERVAL).await {
-                                result.context("failed to send websocket ping")?;
+                        _ = keepalive_timer => {
+                            if let Some(result) = writer.write(proto::Message::Ping).timeout(WRITE_TIMEOUT).await {
+                                result.context("failed to send keepalive")?;
+                                keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
                             } else {
-                                Err(anyhow!("timed out sending websocket ping"))?;
+                                Err(anyhow!("timed out sending keepalive"))?;
                             }
                         }
-                        _ = read_timeout => {
-                            Err(anyhow!("timed out reading message"))?
+                        _ = receive_timeout => {
+                            Err(anyhow!("delay between messages too long"))?
                         }
                     }
                 }
@@ -278,11 +285,11 @@ impl Peer {
                 .insert(message_id, tx);
             connection
                 .outgoing_tx
-                .unbounded_send(request.into_envelope(
+                .unbounded_send(proto::Message::Envelope(request.into_envelope(
                     message_id,
                     None,
                     original_sender_id.map(|id| id.0),
-                ))
+                )))
                 .map_err(|_| anyhow!("connection was closed"))?;
             Ok(())
         });
@@ -305,7 +312,9 @@ impl Peer {
             .fetch_add(1, atomic::Ordering::SeqCst);
         connection
             .outgoing_tx
-            .unbounded_send(message.into_envelope(message_id, None, None))?;
+            .unbounded_send(proto::Message::Envelope(
+                message.into_envelope(message_id, None, None),
+            ))?;
         Ok(())
     }
 
@@ -321,7 +330,11 @@ impl Peer {
             .fetch_add(1, atomic::Ordering::SeqCst);
         connection
             .outgoing_tx
-            .unbounded_send(message.into_envelope(message_id, None, Some(sender_id.0)))?;
+            .unbounded_send(proto::Message::Envelope(message.into_envelope(
+                message_id,
+                None,
+                Some(sender_id.0),
+            )))?;
         Ok(())
     }
 
@@ -336,7 +349,11 @@ impl Peer {
             .fetch_add(1, atomic::Ordering::SeqCst);
         connection
             .outgoing_tx
-            .unbounded_send(response.into_envelope(message_id, Some(receipt.message_id), None))?;
+            .unbounded_send(proto::Message::Envelope(response.into_envelope(
+                message_id,
+                Some(receipt.message_id),
+                None,
+            )))?;
         Ok(())
     }
 
@@ -351,7 +368,11 @@ impl Peer {
             .fetch_add(1, atomic::Ordering::SeqCst);
         connection
             .outgoing_tx
-            .unbounded_send(response.into_envelope(message_id, Some(receipt.message_id), None))?;
+            .unbounded_send(proto::Message::Envelope(response.into_envelope(
+                message_id,
+                Some(receipt.message_id),
+                None,
+            )))?;
         Ok(())
     }
 

crates/rpc/src/proto.rs 🔗

@@ -283,6 +283,7 @@ pub struct MessageStream<S> {
     encoding_buffer: Vec<u8>,
 }
 
+#[derive(Debug)]
 pub enum Message {
     Envelope(Envelope),
     Ping,

crates/server/src/rpc.rs 🔗

@@ -2732,8 +2732,6 @@ mod tests {
             .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
             .await;
 
-        eprintln!("sharing");
-
         project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
 
         // Join the worktree as client B.