diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 999c87f9ee38f74597f92148ccba40c548a27ce1..892ea4ccd658faab2ccea8b685110a6d2bb6057a 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -131,7 +131,7 @@ impl Database { .await } - pub async fn outdated_room_ids(&self) -> Result> { + pub async fn stale_room_ids(&self) -> Result> { self.transaction(|tx| async move { #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] enum QueryAs { diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 8150b1c0af53563d068603f73434f120560b536c..bee8f9a34fd608d7c07a98de7b273ea66c624b1f 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -1,7 +1,7 @@ use crate::{ db::{self, NewUserParams, TestDb, UserId}, executor::Executor, - rpc::{Server, RECONNECT_TIMEOUT}, + rpc::{Server, CLEANUP_TIMEOUT, RECONNECT_TIMEOUT}, AppState, }; use anyhow::anyhow; @@ -686,7 +686,7 @@ async fn test_server_restarts( // The server finishes restarting, cleaning up stale connections. server.start().await.unwrap(); - deterministic.advance_clock(RECONNECT_TIMEOUT); + deterministic.advance_clock(CLEANUP_TIMEOUT); assert_eq!( room_participants(&room_a, cx_a), RoomParticipants { @@ -806,7 +806,7 @@ async fn test_server_restarts( // The server finishes restarting, cleaning up stale connections and canceling the // call to user D because the room has become empty. server.start().await.unwrap(); - deterministic.advance_clock(RECONNECT_TIMEOUT); + deterministic.advance_clock(CLEANUP_TIMEOUT); assert!(incoming_call_d.next().await.unwrap().is_none()); } @@ -6125,7 +6125,7 @@ async fn test_random_collaboration( server.teardown(); deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); server.start().await.unwrap(); - deterministic.advance_clock(RECONNECT_TIMEOUT); + deterministic.advance_clock(CLEANUP_TIMEOUT); } _ if !op_start_signals.is_empty() => { while operations < max_operations && rng.lock().gen_bool(0.7) { @@ -6326,7 +6326,7 @@ impl TestServer { ); server.start().await.unwrap(); // Advance clock to ensure the server's cleanup task is finished. - deterministic.advance_clock(RECONNECT_TIMEOUT); + deterministic.advance_clock(CLEANUP_TIMEOUT); Self { app_state, server, diff --git a/crates/collab/src/main.rs b/crates/collab/src/main.rs index 710910fe033bdca14993e60a986fde578c9bb59a..8ad1313d1ea8bff15b54793aa40aa97405d20c8a 100644 --- a/crates/collab/src/main.rs +++ b/crates/collab/src/main.rs @@ -65,6 +65,12 @@ async fn main() -> Result<()> { axum::Server::from_tcp(listener)? .serve(app.into_make_service_with_connect_info::()) + .with_graceful_shutdown(async move { + tokio::signal::ctrl_c() + .await + .expect("failed to listen for interrupt signal"); + rpc_server.teardown(); + }) .await?; } _ => { diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index e52ecbb0c275900d4cc7644f321d71c5d8863adf..69794ab35aec448c1bec30a5e8f0a37af72f10da 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -57,7 +57,8 @@ use tokio::sync::watch; use tower::ServiceBuilder; use tracing::{info_span, instrument, Instrument}; -pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(60); +pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(5); +pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10); lazy_static! { static ref METRIC_CONNECTIONS: IntGauge = @@ -241,12 +242,12 @@ impl Server { self.app_state.db.delete_stale_projects().await?; let db = self.app_state.db.clone(); let peer = self.peer.clone(); - let timeout = self.executor.sleep(RECONNECT_TIMEOUT); + let timeout = self.executor.sleep(CLEANUP_TIMEOUT); let pool = self.connection_pool.clone(); let live_kit_client = self.app_state.live_kit_client.clone(); self.executor.spawn_detached(async move { timeout.await; - if let Some(room_ids) = db.outdated_room_ids().await.trace_err() { + if let Some(room_ids) = db.stale_room_ids().await.trace_err() { for room_id in room_ids { let mut contacts_to_update = HashSet::default(); let mut canceled_calls_to_user_ids = Vec::new(); @@ -323,7 +324,6 @@ impl Server { Ok(()) } - #[cfg(test)] pub fn teardown(&self) { self.peer.reset(); self.connection_pool.lock().reset();