Add more information to our logs (#35557)

Mikayla Maki and Max Brunsfeld created

Add more logging to collab in order to help diagnose throughput issues.

IMPORTANT: Do not deploy this PR without pinging me.

Release Notes:

- N/A

---------

Co-authored-by: Max Brunsfeld <maxbrunsfeld@gmail.com>

Change summary

crates/collab/src/rpc.rs | 82 ++++++++++++++++++++++-------------------
crates/rpc/src/peer.rs   | 18 +++++++++
2 files changed, 61 insertions(+), 39 deletions(-)

Detailed changes

crates/collab/src/rpc.rs 🔗

@@ -315,7 +315,7 @@ impl Server {
             .add_request_handler(forward_read_only_project_request::<proto::GetDefinition>)
             .add_request_handler(forward_read_only_project_request::<proto::GetTypeDefinition>)
             .add_request_handler(forward_read_only_project_request::<proto::GetReferences>)
-            .add_request_handler(forward_find_search_candidates_request)
+            .add_request_handler(forward_read_only_project_request::<proto::FindSearchCandidates>)
             .add_request_handler(forward_read_only_project_request::<proto::GetDocumentHighlights>)
             .add_request_handler(forward_read_only_project_request::<proto::GetDocumentSymbols>)
             .add_request_handler(forward_read_only_project_request::<proto::GetProjectSymbols>)
@@ -666,7 +666,6 @@ impl Server {
                     let total_duration_ms = received_at.elapsed().as_micros() as f64 / 1000.0;
                     let processing_duration_ms = start_time.elapsed().as_micros() as f64 / 1000.0;
                     let queue_duration_ms = total_duration_ms - processing_duration_ms;
-                    let payload_type = M::NAME;
 
                     match result {
                         Err(error) => {
@@ -675,7 +674,6 @@ impl Server {
                                 total_duration_ms,
                                 processing_duration_ms,
                                 queue_duration_ms,
-                                payload_type,
                                 "error handling message"
                             )
                         }
@@ -780,12 +778,11 @@ impl Server {
         async move {
             if *teardown.borrow() {
                 tracing::error!("server is tearing down");
-                return
+                return;
             }
 
-            let (connection_id, handle_io, mut incoming_rx) = this
-                .peer
-                .add_connection(connection, {
+            let (connection_id, handle_io, mut incoming_rx) =
+                this.peer.add_connection(connection, {
                     let executor = executor.clone();
                     move |duration| executor.sleep(duration)
                 });
@@ -802,10 +799,14 @@ impl Server {
                 }
             };
 
-            let supermaven_client = this.app_state.config.supermaven_admin_api_key.clone().map(|supermaven_admin_api_key| Arc::new(SupermavenAdminApi::new(
-                    supermaven_admin_api_key.to_string(),
-                    http_client.clone(),
-                )));
+            let supermaven_client = this.app_state.config.supermaven_admin_api_key.clone().map(
+                |supermaven_admin_api_key| {
+                    Arc::new(SupermavenAdminApi::new(
+                        supermaven_admin_api_key.to_string(),
+                        http_client.clone(),
+                    ))
+                },
+            );
 
             let session = Session {
                 principal: principal.clone(),
@@ -820,7 +821,15 @@ impl Server {
                 supermaven_client,
             };
 
-            if let Err(error) = this.send_initial_client_update(connection_id, zed_version, send_connection_id, &session).await {
+            if let Err(error) = this
+                .send_initial_client_update(
+                    connection_id,
+                    zed_version,
+                    send_connection_id,
+                    &session,
+                )
+                .await
+            {
                 tracing::error!(?error, "failed to send initial client update");
                 return;
             }
@@ -837,14 +846,22 @@ impl Server {
             //
             // This arrangement ensures we will attempt to process earlier messages first, but fall
             // back to processing messages arrived later in the spirit of making progress.
+            const MAX_CONCURRENT_HANDLERS: usize = 256;
             let mut foreground_message_handlers = FuturesUnordered::new();
-            let concurrent_handlers = Arc::new(Semaphore::new(256));
+            let concurrent_handlers = Arc::new(Semaphore::new(MAX_CONCURRENT_HANDLERS));
+            let get_concurrent_handlers = {
+                let concurrent_handlers = concurrent_handlers.clone();
+                move || MAX_CONCURRENT_HANDLERS - concurrent_handlers.available_permits()
+            };
             loop {
                 let next_message = async {
                     let permit = concurrent_handlers.clone().acquire_owned().await.unwrap();
                     let message = incoming_rx.next().await;
-                    (permit, message)
-                }.fuse();
+                    // Cache the concurrent_handlers here, so that we know what the
+                    // queue looks like as each handler starts
+                    (permit, message, get_concurrent_handlers())
+                }
+                .fuse();
                 futures::pin_mut!(next_message);
                 futures::select_biased! {
                     _ = teardown.changed().fuse() => return,
@@ -856,12 +873,16 @@ impl Server {
                     }
                     _ = foreground_message_handlers.next() => {}
                     next_message = next_message => {
-                        let (permit, message) = next_message;
+                        let (permit, message, concurrent_handlers) = next_message;
                         if let Some(message) = message {
                             let type_name = message.payload_type_name();
                             // note: we copy all the fields from the parent span so we can query them in the logs.
                             // (https://github.com/tokio-rs/tracing/issues/2670).
-                            let span = tracing::info_span!("receive message", %connection_id, %address, type_name,
+                            let span = tracing::info_span!("receive message",
+                                %connection_id,
+                                %address,
+                                type_name,
+                                concurrent_handlers,
                                 user_id=field::Empty,
                                 login=field::Empty,
                                 impersonator=field::Empty,
@@ -895,12 +916,13 @@ impl Server {
             }
 
             drop(foreground_message_handlers);
-            tracing::info!("signing out");
+            let concurrent_handlers = get_concurrent_handlers();
+            tracing::info!(concurrent_handlers, "signing out");
             if let Err(error) = connection_lost(session, teardown, executor).await {
                 tracing::error!(?error, "error signing out");
             }
-
-        }.instrument(span)
+        }
+        .instrument(span)
     }
 
     async fn send_initial_client_update(
@@ -2286,25 +2308,6 @@ where
     Ok(())
 }
 
-async fn forward_find_search_candidates_request(
-    request: proto::FindSearchCandidates,
-    response: Response<proto::FindSearchCandidates>,
-    session: Session,
-) -> Result<()> {
-    let project_id = ProjectId::from_proto(request.remote_entity_id());
-    let host_connection_id = session
-        .db()
-        .await
-        .host_for_read_only_project_request(project_id, session.connection_id)
-        .await?;
-    let payload = session
-        .peer
-        .forward_request(session.connection_id, host_connection_id, request)
-        .await?;
-    response.send(payload)?;
-    Ok(())
-}
-
 /// forward a project request to the host. These requests are disallowed
 /// for guests.
 async fn forward_mutating_project_request<T>(
@@ -2336,6 +2339,7 @@ async fn multi_lsp_query(
     session: Session,
 ) -> Result<()> {
     tracing::Span::current().record("multi_lsp_query_request", request.request_str());
+    tracing::info!("multi_lsp_query message received");
     forward_mutating_project_request(request, response, session).await
 }
 

crates/rpc/src/peer.rs 🔗

@@ -422,8 +422,26 @@ impl Peer {
         receiver_id: ConnectionId,
         request: T,
     ) -> impl Future<Output = Result<T::Response>> {
+        let request_start_time = Instant::now();
+        let payload_type = T::NAME;
+        let elapsed_time = move || request_start_time.elapsed().as_millis();
+        tracing::info!(payload_type, "start forwarding request");
         self.request_internal(Some(sender_id), receiver_id, request)
             .map_ok(|envelope| envelope.payload)
+            .inspect_err(move |_| {
+                tracing::error!(
+                    waiting_for_host_ms = elapsed_time(),
+                    payload_type,
+                    "error forwarding request"
+                )
+            })
+            .inspect_ok(move |_| {
+                tracing::info!(
+                    waiting_for_host_ms = elapsed_time(),
+                    payload_type,
+                    "finished forwarding request"
+                )
+            })
     }
 
     fn request_internal<T: RequestMessage>(