Instrument `rpc::Server::start` and reduce cleanup timeout again

Antonio Scandurra created

Change summary

crates/collab/src/rpc.rs | 148 +++++++++++++++++++++++------------------
1 file changed, 84 insertions(+), 64 deletions(-)

Detailed changes

crates/collab/src/rpc.rs 🔗

@@ -58,7 +58,7 @@ use tower::ServiceBuilder;
 use tracing::{info_span, instrument, Instrument};
 
 pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(5);
-pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(20);
+pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10);
 
 lazy_static! {
     static ref METRIC_CONNECTIONS: IntGauge =
@@ -239,88 +239,108 @@ impl Server {
     }
 
     pub async fn start(&self) -> Result<()> {
-        self.app_state.db.delete_stale_projects().await?;
         let db = self.app_state.db.clone();
         let peer = self.peer.clone();
         let timeout = self.executor.sleep(CLEANUP_TIMEOUT);
         let pool = self.connection_pool.clone();
         let live_kit_client = self.app_state.live_kit_client.clone();
-        self.executor.spawn_detached(async move {
-            timeout.await;
-            if let Some(room_ids) = db.stale_room_ids().await.trace_err() {
-                for room_id in room_ids {
-                    let mut contacts_to_update = HashSet::default();
-                    let mut canceled_calls_to_user_ids = Vec::new();
-                    let mut live_kit_room = String::new();
-                    let mut delete_live_kit_room = false;
-
-                    if let Ok(mut refreshed_room) = db.refresh_room(room_id).await {
-                        room_updated(&refreshed_room.room, &peer);
-                        contacts_to_update
-                            .extend(refreshed_room.stale_participant_user_ids.iter().copied());
-                        contacts_to_update
-                            .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
-                        canceled_calls_to_user_ids =
-                            mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
-                        live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
-                        delete_live_kit_room = refreshed_room.room.participants.is_empty();
-                    }
 
-                    {
-                        let pool = pool.lock();
-                        for canceled_user_id in canceled_calls_to_user_ids {
-                            for connection_id in pool.user_connection_ids(canceled_user_id) {
-                                peer.send(
-                                    connection_id,
-                                    proto::CallCanceled {
-                                        room_id: room_id.to_proto(),
-                                    },
-                                )
-                                .trace_err();
-                            }
+        let span = info_span!("start server");
+        let span_enter = span.enter();
+
+        tracing::info!("begin deleting stale projects");
+        self.app_state.db.delete_stale_projects().await?;
+        tracing::info!("finish deleting stale projects");
+
+        drop(span_enter);
+        self.executor.spawn_detached(
+            async move {
+                tracing::info!("waiting for cleanup timeout");
+                timeout.await;
+                tracing::info!("cleanup timeout expired, retrieving stale rooms");
+                if let Some(room_ids) = db.stale_room_ids().await.trace_err() {
+                    tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms");
+                    for room_id in room_ids {
+                        let mut contacts_to_update = HashSet::default();
+                        let mut canceled_calls_to_user_ids = Vec::new();
+                        let mut live_kit_room = String::new();
+                        let mut delete_live_kit_room = false;
+
+                        if let Ok(mut refreshed_room) = db.refresh_room(room_id).await {
+                            tracing::info!(
+                                room_id = room_id.0,
+                                new_participant_count = refreshed_room.room.participants.len(),
+                                "refreshed room"
+                            );
+                            room_updated(&refreshed_room.room, &peer);
+                            contacts_to_update
+                                .extend(refreshed_room.stale_participant_user_ids.iter().copied());
+                            contacts_to_update
+                                .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied());
+                            canceled_calls_to_user_ids =
+                                mem::take(&mut refreshed_room.canceled_calls_to_user_ids);
+                            live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room);
+                            delete_live_kit_room = refreshed_room.room.participants.is_empty();
                         }
-                    }
 
-                    for user_id in contacts_to_update {
-                        let busy = db.is_user_busy(user_id).await.trace_err();
-                        let contacts = db.get_contacts(user_id).await.trace_err();
-                        if let Some((busy, contacts)) = busy.zip(contacts) {
+                        {
                             let pool = pool.lock();
-                            let updated_contact = contact_for_user(user_id, false, busy, &pool);
-                            for contact in contacts {
-                                if let db::Contact::Accepted {
-                                    user_id: contact_user_id,
-                                    ..
-                                } = contact
-                                {
-                                    for contact_conn_id in pool.user_connection_ids(contact_user_id)
+                            for canceled_user_id in canceled_calls_to_user_ids {
+                                for connection_id in pool.user_connection_ids(canceled_user_id) {
+                                    peer.send(
+                                        connection_id,
+                                        proto::CallCanceled {
+                                            room_id: room_id.to_proto(),
+                                        },
+                                    )
+                                    .trace_err();
+                                }
+                            }
+                        }
+
+                        for user_id in contacts_to_update {
+                            let busy = db.is_user_busy(user_id).await.trace_err();
+                            let contacts = db.get_contacts(user_id).await.trace_err();
+                            if let Some((busy, contacts)) = busy.zip(contacts) {
+                                let pool = pool.lock();
+                                let updated_contact = contact_for_user(user_id, false, busy, &pool);
+                                for contact in contacts {
+                                    if let db::Contact::Accepted {
+                                        user_id: contact_user_id,
+                                        ..
+                                    } = contact
                                     {
-                                        peer.send(
-                                            contact_conn_id,
-                                            proto::UpdateContacts {
-                                                contacts: vec![updated_contact.clone()],
-                                                remove_contacts: Default::default(),
-                                                incoming_requests: Default::default(),
-                                                remove_incoming_requests: Default::default(),
-                                                outgoing_requests: Default::default(),
-                                                remove_outgoing_requests: Default::default(),
-                                            },
-                                        )
-                                        .trace_err();
+                                        for contact_conn_id in
+                                            pool.user_connection_ids(contact_user_id)
+                                        {
+                                            peer.send(
+                                                contact_conn_id,
+                                                proto::UpdateContacts {
+                                                    contacts: vec![updated_contact.clone()],
+                                                    remove_contacts: Default::default(),
+                                                    incoming_requests: Default::default(),
+                                                    remove_incoming_requests: Default::default(),
+                                                    outgoing_requests: Default::default(),
+                                                    remove_outgoing_requests: Default::default(),
+                                                },
+                                            )
+                                            .trace_err();
+                                        }
                                     }
                                 }
                             }
                         }
-                    }
 
-                    if let Some(live_kit) = live_kit_client.as_ref() {
-                        if delete_live_kit_room {
-                            live_kit.delete_room(live_kit_room).await.trace_err();
+                        if let Some(live_kit) = live_kit_client.as_ref() {
+                            if delete_live_kit_room {
+                                live_kit.delete_room(live_kit_room).await.trace_err();
+                            }
                         }
                     }
                 }
             }
-        });
+            .instrument(span),
+        );
         Ok(())
     }