Remove non-determinism from `Peer` caused by smol's `timeout` helper

Antonio Scandurra created

Change summary

crates/collab/src/integration_tests.rs |  2 
crates/rpc/src/peer.rs                 | 59 +++++++++++++++------------
2 files changed, 34 insertions(+), 27 deletions(-)

Detailed changes

crates/collab/src/integration_tests.rs 🔗

@@ -4722,7 +4722,7 @@ async fn test_random_collaboration(
                 op_start_signals.remove(guest_ix);
                 server.forbid_connections();
                 server.disconnect_client(removed_guest_id);
-                deterministic.advance_clock(5 * RECEIVE_TIMEOUT);
+                deterministic.advance_clock(RECEIVE_TIMEOUT);
                 deterministic.start_waiting();
                 let (guest, guest_project, mut guest_cx, guest_err) = guest.await;
                 deterministic.finish_waiting();

crates/rpc/src/peer.rs 🔗

@@ -11,7 +11,6 @@ use futures::{
 };
 use parking_lot::{Mutex, RwLock};
 use serde::{ser::SerializeStruct, Serialize};
-use smol_timeout::TimeoutExt;
 use std::sync::atomic::Ordering::SeqCst;
 use std::{
     fmt,
@@ -177,14 +176,17 @@ impl Peer {
                         outgoing = outgoing_rx.next().fuse() => match outgoing {
                             Some(outgoing) => {
                                 tracing::debug!(%connection_id, "outgoing rpc message: writing");
-                                if let Some(result) = writer.write(outgoing).timeout(WRITE_TIMEOUT).await {
-                                    tracing::debug!(%connection_id, "outgoing rpc message: done writing");
-                                    result.context("failed to write RPC message")?;
-                                    tracing::debug!(%connection_id, "keepalive interval: resetting after sending message");
-                                    keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
-                                } else {
-                                    tracing::debug!(%connection_id, "outgoing rpc message: writing timed out");
-                                    Err(anyhow!("timed out writing message"))?;
+                                futures::select_biased! {
+                                    result = writer.write(outgoing).fuse() => {
+                                        tracing::debug!(%connection_id, "outgoing rpc message: done writing");
+                                        result.context("failed to write RPC message")?;
+                                        tracing::debug!(%connection_id, "keepalive interval: resetting after sending message");
+                                        keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
+                                    }
+                                    _ = create_timer(WRITE_TIMEOUT).fuse() => {
+                                        tracing::debug!(%connection_id, "outgoing rpc message: writing timed out");
+                                        Err(anyhow!("timed out writing message"))?;
+                                    }
                                 }
                             }
                             None => {
@@ -199,32 +201,37 @@ impl Peer {
                             receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse());
                             if let proto::Message::Envelope(incoming) = incoming {
                                 tracing::debug!(%connection_id, "incoming rpc message: processing");
-                                match incoming_tx.send(incoming).timeout(RECEIVE_TIMEOUT).await {
-                                    Some(Ok(_)) => {
-                                        tracing::debug!(%connection_id, "incoming rpc message: processed");
-                                    },
-                                    Some(Err(_)) => {
-                                        tracing::debug!(%connection_id, "incoming rpc message: channel closed");
-                                        return Ok(())
+                                futures::select_biased! {
+                                    result = incoming_tx.send(incoming).fuse() => match result {
+                                        Ok(_) => {
+                                            tracing::debug!(%connection_id, "incoming rpc message: processed");
+                                        }
+                                        Err(_) => {
+                                            tracing::debug!(%connection_id, "incoming rpc message: channel closed");
+                                            return Ok(())
+                                        }
                                     },
-                                    None => {
+                                    _ = create_timer(WRITE_TIMEOUT).fuse() => {
                                         tracing::debug!(%connection_id, "incoming rpc message: processing timed out");
                                         Err(anyhow!("timed out processing incoming message"))?
-                                    },
+                                    }
                                 }
                             }
                             break;
                         },
                         _ = keepalive_timer => {
                             tracing::debug!(%connection_id, "keepalive interval: pinging");
-                            if let Some(result) = writer.write(proto::Message::Ping).timeout(WRITE_TIMEOUT).await {
-                                tracing::debug!(%connection_id, "keepalive interval: done pinging");
-                                result.context("failed to send keepalive")?;
-                                tracing::debug!(%connection_id, "keepalive interval: resetting after pinging");
-                                keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
-                            } else {
-                                tracing::debug!(%connection_id, "keepalive interval: pinging timed out");
-                                Err(anyhow!("timed out sending keepalive"))?;
+                            futures::select_biased! {
+                                result = writer.write(proto::Message::Ping).fuse() => {
+                                    tracing::debug!(%connection_id, "keepalive interval: done pinging");
+                                    result.context("failed to send keepalive")?;
+                                    tracing::debug!(%connection_id, "keepalive interval: resetting after pinging");
+                                    keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse());
+                                }
+                                _ = create_timer(WRITE_TIMEOUT).fuse() => {
+                                    tracing::debug!(%connection_id, "keepalive interval: pinging timed out");
+                                    Err(anyhow!("timed out sending keepalive"))?;
+                                }
                             }
                         }
                         _ = receive_timeout => {