From 5dc82d3df8d0b32ed7c327b25b019d6d6c4d5d12 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 20 Oct 2022 14:34:05 +0200 Subject: [PATCH] Delete all live-kit rooms when server is shut down --- crates/collab/src/api.rs | 4 +-- crates/collab/src/main.rs | 56 ++++++++++++++++++++++++++++++++-- crates/collab/src/rpc/store.rs | 4 +++ 3 files changed, 60 insertions(+), 4 deletions(-) diff --git a/crates/collab/src/api.rs b/crates/collab/src/api.rs index 08dfa91ba98df3b1c54e8690dbd31bd169da936c..0dd54989785681f5115e7d7adf20e65ab5dbd1fb 100644 --- a/crates/collab/src/api.rs +++ b/crates/collab/src/api.rs @@ -22,7 +22,7 @@ use time::OffsetDateTime; use tower::ServiceBuilder; use tracing::instrument; -pub fn routes(rpc_server: &Arc, state: Arc) -> Router { +pub fn routes(rpc_server: Arc, state: Arc) -> Router { Router::new() .route("/user", get(get_authenticated_user)) .route("/users", get(get_users).post(create_user)) @@ -50,7 +50,7 @@ pub fn routes(rpc_server: &Arc, state: Arc) -> Router Result<()> { rpc_server.start_recording_project_activity(Duration::from_secs(5 * 60), rpc::RealExecutor); let app = Router::::new() - .merge(api::routes(&rpc_server, state.clone())) - .merge(rpc::routes(rpc_server)); + .merge(api::routes(rpc_server.clone(), state.clone())) + .merge(rpc::routes(rpc_server.clone())); axum::Server::from_tcp(listener)? .serve(app.into_make_service_with_connect_info::()) + .with_graceful_shutdown(graceful_shutdown(rpc_server, state)) .await?; Ok(()) @@ -133,3 +136,52 @@ pub fn init_tracing(config: &Config) -> Option<()> { None } + +async fn graceful_shutdown(rpc_server: Arc, state: Arc) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {}, + } + + if let Some(live_kit) = state.live_kit_client.as_ref() { + let deletions = rpc_server + .store() + .await + .rooms() + .values() + .map(|room| { + let name = room.live_kit_room.clone(); + async { + live_kit.delete_room(name).await.trace_err(); + } + }) + .collect::>(); + + tracing::info!("deleting all live-kit rooms"); + if let Err(_) = tokio::time::timeout( + Duration::from_secs(10), + futures::future::join_all(deletions), + ) + .await + { + tracing::error!("timed out waiting for live-kit room deletion"); + } + } +} diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 9a2e8cbddae53af67551cf63e1658f29c58ce87f..a7abce7094b1c8fdfb062d98e21c9f859b58cdfa 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -519,6 +519,10 @@ impl Store { self.rooms.get(&room_id) } + pub fn rooms(&self) -> &BTreeMap { + &self.rooms + } + pub fn call( &mut self, room_id: RoomId,