Fix excessive delay before clearing stale room data

Max Brunsfeld and Antonio Scandurra created

Co-authored-by: Antonio Scandurra <antonio@zed.dev>

Change summary

crates/collab/src/db.rs                |  2 +-
crates/collab/src/integration_tests.rs | 10 +++++-----
crates/collab/src/main.rs              |  6 ++++++
crates/collab/src/rpc.rs               |  8 ++++----
4 files changed, 16 insertions(+), 10 deletions(-)

Detailed changes

crates/collab/src/db.rs 🔗

@@ -131,7 +131,7 @@ impl Database {
         .await
     }
 
-    pub async fn outdated_room_ids(&self) -> Result<Vec<RoomId>> {
+    pub async fn stale_room_ids(&self) -> Result<Vec<RoomId>> {
         self.transaction(|tx| async move {
             #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)]
             enum QueryAs {

crates/collab/src/integration_tests.rs 🔗

@@ -1,7 +1,7 @@
 use crate::{
     db::{self, NewUserParams, TestDb, UserId},
     executor::Executor,
-    rpc::{Server, RECONNECT_TIMEOUT},
+    rpc::{Server, CLEANUP_TIMEOUT, RECONNECT_TIMEOUT},
     AppState,
 };
 use anyhow::anyhow;
@@ -686,7 +686,7 @@ async fn test_server_restarts(
 
     // The server finishes restarting, cleaning up stale connections.
     server.start().await.unwrap();
-    deterministic.advance_clock(RECONNECT_TIMEOUT);
+    deterministic.advance_clock(CLEANUP_TIMEOUT);
     assert_eq!(
         room_participants(&room_a, cx_a),
         RoomParticipants {
@@ -806,7 +806,7 @@ async fn test_server_restarts(
     // The server finishes restarting, cleaning up stale connections and canceling the
     // call to user D because the room has become empty.
     server.start().await.unwrap();
-    deterministic.advance_clock(RECONNECT_TIMEOUT);
+    deterministic.advance_clock(CLEANUP_TIMEOUT);
     assert!(incoming_call_d.next().await.unwrap().is_none());
 }
 
@@ -6125,7 +6125,7 @@ async fn test_random_collaboration(
                 server.teardown();
                 deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT);
                 server.start().await.unwrap();
-                deterministic.advance_clock(RECONNECT_TIMEOUT);
+                deterministic.advance_clock(CLEANUP_TIMEOUT);
             }
             _ if !op_start_signals.is_empty() => {
                 while operations < max_operations && rng.lock().gen_bool(0.7) {
@@ -6326,7 +6326,7 @@ impl TestServer {
         );
         server.start().await.unwrap();
         // Advance clock to ensure the server's cleanup task is finished.
-        deterministic.advance_clock(RECONNECT_TIMEOUT);
+        deterministic.advance_clock(CLEANUP_TIMEOUT);
         Self {
             app_state,
             server,

crates/collab/src/main.rs 🔗

@@ -65,6 +65,12 @@ async fn main() -> Result<()> {
 
             axum::Server::from_tcp(listener)?
                 .serve(app.into_make_service_with_connect_info::<SocketAddr>())
+                .with_graceful_shutdown(async move {
+                    tokio::signal::ctrl_c()
+                        .await
+                        .expect("failed to listen for interrupt signal");
+                    rpc_server.teardown();
+                })
                 .await?;
         }
         _ => {

crates/collab/src/rpc.rs 🔗

@@ -57,7 +57,8 @@ use tokio::sync::watch;
 use tower::ServiceBuilder;
 use tracing::{info_span, instrument, Instrument};
 
-pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(60);
+pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(5);
+pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
 
 lazy_static! {
     static ref METRIC_CONNECTIONS: IntGauge =
@@ -241,12 +242,12 @@ impl Server {
         self.app_state.db.delete_stale_projects().await?;
         let db = self.app_state.db.clone();
         let peer = self.peer.clone();
-        let timeout = self.executor.sleep(RECONNECT_TIMEOUT);
+        let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
         let pool = self.connection_pool.clone();
         let live_kit_client = self.app_state.live_kit_client.clone();
         self.executor.spawn_detached(async move {
             timeout.await;
-            if let Some(room_ids) = db.outdated_room_ids().await.trace_err() {
+            if let Some(room_ids) = db.stale_room_ids().await.trace_err() {
                 for room_id in room_ids {
                     let mut contacts_to_update = HashSet::default();
                     let mut canceled_calls_to_user_ids = Vec::new();
@@ -323,7 +324,6 @@ impl Server {
         Ok(())
     }
 
-    #[cfg(test)]
     pub fn teardown(&self) {
         self.peer.reset();
         self.connection_pool.lock().reset();