collab logging (#9420)

Conrad Irwin created

- **Log errors inside the connection span**
- **Tidy up collab logging**


Release Notes:

- N/A

Change summary

crates/collab/src/main.rs |  24 +++++-
crates/collab/src/rpc.rs  | 140 ++++++++++++++++++++++++----------------
2 files changed, 101 insertions(+), 63 deletions(-)

Detailed changes

crates/collab/src/main.rs 🔗

@@ -1,5 +1,10 @@
 use anyhow::anyhow;
-use axum::{extract::MatchedPath, http::Request, routing::get, Extension, Router};
+use axum::{
+    extract::MatchedPath,
+    http::{Request, Response},
+    routing::get,
+    Extension, Router,
+};
 use collab::{
     api::fetch_extensions_from_blob_store_periodically, db, env, executor::Executor, AppState,
     Config, MigrateConfig, Result,
@@ -10,11 +15,11 @@ use std::{
     net::{SocketAddr, TcpListener},
     path::Path,
     sync::Arc,
+    time::Duration,
 };
 #[cfg(unix)]
 use tokio::signal::unix::SignalKind;
-use tower_http::trace::{self, TraceLayer};
-use tracing::Level;
+use tower_http::trace::TraceLayer;
 use tracing_subscriber::{
     filter::EnvFilter, fmt::format::JsonFields, util::SubscriberInitExt, Layer,
 };
@@ -107,7 +112,16 @@ async fn main() -> Result<()> {
                                 matched_path,
                             )
                         })
-                        .on_response(trace::DefaultOnResponse::new().level(Level::INFO)),
+                        .on_response(
+                            |response: &Response<_>, latency: Duration, _: &tracing::Span| {
+                                let duration_ms = latency.as_micros() as f64 / 1000.;
+                                tracing::info!(
+                                    duration_ms,
+                                    status = response.status().as_u16(),
+                                    "finished processing request"
+                                );
+                            },
+                        ),
                 );
 
             #[cfg(unix)]
@@ -192,7 +206,7 @@ pub fn init_tracing(config: &Config) -> Option<()> {
                         tracing_subscriber::fmt::format()
                             .json()
                             .flatten_event(true)
-                            .with_span_list(true),
+                            .with_span_list(false),
                     )
                     .with_filter(filter),
             ) as Box<dyn Layer<_> + Send + Sync>

crates/collab/src/rpc.rs 🔗

@@ -467,16 +467,9 @@ impl Server {
             Box::new(move |envelope, session| {
                 let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
                 let received_at = envelope.received_at;
-                let span = info_span!(
-                    "handle message",
-                    payload_type = envelope.payload_type_name()
-                );
-                span.in_scope(|| {
                     tracing::info!(
-                        payload_type = envelope.payload_type_name(),
                         "message received"
                     );
-                });
                 let start_time = Instant::now();
                 let future = (handler)(*envelope, session);
                 async move {
@@ -491,7 +484,6 @@ impl Server {
                         Ok(()) => tracing::info!(total_duration_ms, processing_duration_ms, queue_duration_ms, "finished handling message"),
                     }
                 }
-                .instrument(span)
                 .boxed()
             }),
         );
@@ -558,20 +550,21 @@ impl Server {
         user: User,
         zed_version: ZedVersion,
         impersonator: Option<User>,
-        mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
+        send_connection_id: Option<oneshot::Sender<ConnectionId>>,
         executor: Executor,
-    ) -> impl Future<Output = Result<()>> {
+    ) -> impl Future<Output = ()> {
         let this = self.clone();
         let user_id = user.id;
-        let login = user.github_login;
-        let span = info_span!("handle connection", %user_id, %login, %address, impersonator = field::Empty);
+        let login = user.github_login.clone();
+        let span = info_span!("handle connection", %user_id, %login, %address, impersonator = field::Empty, connection_id = field::Empty);
         if let Some(impersonator) = impersonator {
             span.record("impersonator", &impersonator.github_login);
         }
         let mut teardown = self.teardown.subscribe();
         async move {
             if *teardown.borrow() {
-                return Err(anyhow!("server is tearing down"))?;
+                tracing::error!("server is tearing down");
+                return
             }
             let (connection_id, handle_io, mut incoming_rx) = this
                 .peer
@@ -579,40 +572,8 @@ impl Server {
                     let executor = executor.clone();
                     move |duration| executor.sleep(duration)
                 });
-
-            tracing::info!(%user_id, %login, %connection_id, %address, "connection opened");
-            this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?;
-            tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message");
-
-            if let Some(send_connection_id) = send_connection_id.take() {
-                let _ = send_connection_id.send(connection_id);
-            }
-
-            if !user.connected_once {
-                this.peer.send(connection_id, proto::ShowContacts {})?;
-                this.app_state.db.set_user_connected_once(user_id, true).await?;
-            }
-
-            let (contacts, channels_for_user, channel_invites) = future::try_join3(
-                this.app_state.db.get_contacts(user_id),
-                this.app_state.db.get_channels_for_user(user_id),
-                this.app_state.db.get_channel_invites_for_user(user_id),
-            ).await?;
-
-            {
-                let mut pool = this.connection_pool.lock();
-                pool.add_connection(connection_id, user_id, user.admin, zed_version);
-                this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?;
-                this.peer.send(connection_id, build_update_user_channels(&channels_for_user))?;
-                this.peer.send(connection_id, build_channels_update(
-                    channels_for_user,
-                    channel_invites
-                ))?;
-            }
-
-            if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? {
-                this.peer.send(connection_id, incoming_call)?;
-            }
+            tracing::Span::current().record("connection_id", format!("{}", connection_id));
+            tracing::info!("connection opened");
 
             let session = Session {
                 user_id,
@@ -623,7 +584,11 @@ impl Server {
                 live_kit_client: this.app_state.live_kit_client.clone(),
                 _executor: executor.clone()
             };
-            update_user_contacts(user_id, &session).await?;
+
+            if let Err(error) = this.send_initial_client_update(connection_id, user, zed_version, send_connection_id, &session).await {
+                tracing::error!(?error, "failed to send initial client update");
+                return;
+            }
 
             let handle_io = handle_io.fuse();
             futures::pin_mut!(handle_io);
@@ -646,10 +611,10 @@ impl Server {
                 }.fuse();
                 futures::pin_mut!(next_message);
                 futures::select_biased! {
-                    _ = teardown.changed().fuse() => return Ok(()),
+                    _ = teardown.changed().fuse() => return,
                     result = handle_io => {
                         if let Err(error) = result {
-                            tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O");
+                            tracing::error!(?error, "error handling I/O");
                         }
                         break;
                     }
@@ -658,6 +623,8 @@ impl Server {
                         let (permit, message) = next_message;
                         if let Some(message) = message {
                             let type_name = message.payload_type_name();
+                            // note: we copy all the fields from the parent span so we can query them in the logs.
+                            // (https://github.com/tokio-rs/tracing/issues/2670).
                             let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name);
                             let span_enter = span.enter();
                             if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
@@ -675,10 +642,10 @@ impl Server {
                                     foreground_message_handlers.push(handle_message);
                                 }
                             } else {
-                                tracing::error!(%user_id, %login, %connection_id, %address, "no message handler");
+                                tracing::error!("no message handler");
                             }
                         } else {
-                            tracing::info!(%user_id, %login, %connection_id, %address, "connection closed");
+                            tracing::info!("connection closed");
                             break;
                         }
                     }
@@ -686,15 +653,74 @@ impl Server {
             }
 
             drop(foreground_message_handlers);
-            tracing::info!(%user_id, %login, %connection_id, %address, "signing out");
+            tracing::info!("signing out");
             if let Err(error) = connection_lost(session, teardown, executor).await {
-                tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out");
+                tracing::error!(?error, "error signing out");
             }
 
-            Ok(())
         }.instrument(span)
     }
 
+    async fn send_initial_client_update(
+        &self,
+        connection_id: ConnectionId,
+        user: User,
+        zed_version: ZedVersion,
+        mut send_connection_id: Option<oneshot::Sender<ConnectionId>>,
+        session: &Session,
+    ) -> Result<()> {
+        self.peer.send(
+            connection_id,
+            proto::Hello {
+                peer_id: Some(connection_id.into()),
+            },
+        )?;
+        tracing::info!("sent hello message");
+
+        if let Some(send_connection_id) = send_connection_id.take() {
+            let _ = send_connection_id.send(connection_id);
+        }
+
+        if !user.connected_once {
+            self.peer.send(connection_id, proto::ShowContacts {})?;
+            self.app_state
+                .db
+                .set_user_connected_once(user.id, true)
+                .await?;
+        }
+
+        let (contacts, channels_for_user, channel_invites) = future::try_join3(
+            self.app_state.db.get_contacts(user.id),
+            self.app_state.db.get_channels_for_user(user.id),
+            self.app_state.db.get_channel_invites_for_user(user.id),
+        )
+        .await?;
+
+        {
+            let mut pool = self.connection_pool.lock();
+            pool.add_connection(connection_id, user.id, user.admin, zed_version);
+            self.peer.send(
+                connection_id,
+                build_initial_contacts_update(contacts, &pool),
+            )?;
+            self.peer.send(
+                connection_id,
+                build_update_user_channels(&channels_for_user),
+            )?;
+            self.peer.send(
+                connection_id,
+                build_channels_update(channels_for_user, channel_invites),
+            )?;
+        }
+
+        if let Some(incoming_call) = self.app_state.db.incoming_call_for_user(user.id).await? {
+            self.peer.send(connection_id, incoming_call)?;
+        }
+
+        update_user_contacts(user.id, &session).await?;
+        Ok(())
+    }
+
     pub async fn invite_code_redeemed(
         self: &Arc<Self>,
         inviter_id: UserId,
@@ -896,7 +922,6 @@ pub async fn handle_websocket_request(
 
     let socket_address = socket_address.to_string();
     ws.on_upgrade(move |socket| {
-        use util::ResultExt;
         let socket = socket
             .map_ok(to_tungstenite_message)
             .err_into()
@@ -913,8 +938,7 @@ pub async fn handle_websocket_request(
                     None,
                     Executor::Production,
                 )
-                .await
-                .log_err();
+                .await;
         }
     })
 }