diff --git a/crates/collab/src/main.rs b/crates/collab/src/main.rs index e4b134b06b5a7cde2e075c121387990ff0416108..4ac872de445300ef93a94a35435f65b6c1a5a14e 100644 --- a/crates/collab/src/main.rs +++ b/crates/collab/src/main.rs @@ -1,5 +1,10 @@ use anyhow::anyhow; -use axum::{extract::MatchedPath, http::Request, routing::get, Extension, Router}; +use axum::{ + extract::MatchedPath, + http::{Request, Response}, + routing::get, + Extension, Router, +}; use collab::{ api::fetch_extensions_from_blob_store_periodically, db, env, executor::Executor, AppState, Config, MigrateConfig, Result, @@ -10,11 +15,11 @@ use std::{ net::{SocketAddr, TcpListener}, path::Path, sync::Arc, + time::Duration, }; #[cfg(unix)] use tokio::signal::unix::SignalKind; -use tower_http::trace::{self, TraceLayer}; -use tracing::Level; +use tower_http::trace::TraceLayer; use tracing_subscriber::{ filter::EnvFilter, fmt::format::JsonFields, util::SubscriberInitExt, Layer, }; @@ -107,7 +112,16 @@ async fn main() -> Result<()> { matched_path, ) }) - .on_response(trace::DefaultOnResponse::new().level(Level::INFO)), + .on_response( + |response: &Response<_>, latency: Duration, _: &tracing::Span| { + let duration_ms = latency.as_micros() as f64 / 1000.; + tracing::info!( + duration_ms, + status = response.status().as_u16(), + "finished processing request" + ); + }, + ), ); #[cfg(unix)] @@ -192,7 +206,7 @@ pub fn init_tracing(config: &Config) -> Option<()> { tracing_subscriber::fmt::format() .json() .flatten_event(true) - .with_span_list(true), + .with_span_list(false), ) .with_filter(filter), ) as Box + Send + Sync> diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 6102534e1f4ad34ac994170d2fd3fbf0a6c84c23..6700ad8ccce787934e51b59743305840a671af42 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -467,16 +467,9 @@ impl Server { Box::new(move |envelope, session| { let envelope = envelope.into_any().downcast::>().unwrap(); let received_at = envelope.received_at; - let span = info_span!( - "handle message", - payload_type = envelope.payload_type_name() - ); - span.in_scope(|| { tracing::info!( - payload_type = envelope.payload_type_name(), "message received" ); - }); let start_time = Instant::now(); let future = (handler)(*envelope, session); async move { @@ -491,7 +484,6 @@ impl Server { Ok(()) => tracing::info!(total_duration_ms, processing_duration_ms, queue_duration_ms, "finished handling message"), } } - .instrument(span) .boxed() }), ); @@ -558,20 +550,21 @@ impl Server { user: User, zed_version: ZedVersion, impersonator: Option, - mut send_connection_id: Option>, + send_connection_id: Option>, executor: Executor, - ) -> impl Future> { + ) -> impl Future { let this = self.clone(); let user_id = user.id; - let login = user.github_login; - let span = info_span!("handle connection", %user_id, %login, %address, impersonator = field::Empty); + let login = user.github_login.clone(); + let span = info_span!("handle connection", %user_id, %login, %address, impersonator = field::Empty, connection_id = field::Empty); if let Some(impersonator) = impersonator { span.record("impersonator", &impersonator.github_login); } let mut teardown = self.teardown.subscribe(); async move { if *teardown.borrow() { - return Err(anyhow!("server is tearing down"))?; + tracing::error!("server is tearing down"); + return } let (connection_id, handle_io, mut incoming_rx) = this .peer @@ -579,40 +572,8 @@ impl Server { let executor = executor.clone(); move |duration| executor.sleep(duration) }); - - tracing::info!(%user_id, %login, %connection_id, %address, "connection opened"); - this.peer.send(connection_id, proto::Hello { peer_id: Some(connection_id.into()) })?; - tracing::info!(%user_id, %login, %connection_id, %address, "sent hello message"); - - if let Some(send_connection_id) = send_connection_id.take() { - let _ = send_connection_id.send(connection_id); - } - - if !user.connected_once { - this.peer.send(connection_id, proto::ShowContacts {})?; - this.app_state.db.set_user_connected_once(user_id, true).await?; - } - - let (contacts, channels_for_user, channel_invites) = future::try_join3( - this.app_state.db.get_contacts(user_id), - this.app_state.db.get_channels_for_user(user_id), - this.app_state.db.get_channel_invites_for_user(user_id), - ).await?; - - { - let mut pool = this.connection_pool.lock(); - pool.add_connection(connection_id, user_id, user.admin, zed_version); - this.peer.send(connection_id, build_initial_contacts_update(contacts, &pool))?; - this.peer.send(connection_id, build_update_user_channels(&channels_for_user))?; - this.peer.send(connection_id, build_channels_update( - channels_for_user, - channel_invites - ))?; - } - - if let Some(incoming_call) = this.app_state.db.incoming_call_for_user(user_id).await? { - this.peer.send(connection_id, incoming_call)?; - } + tracing::Span::current().record("connection_id", format!("{}", connection_id)); + tracing::info!("connection opened"); let session = Session { user_id, @@ -623,7 +584,11 @@ impl Server { live_kit_client: this.app_state.live_kit_client.clone(), _executor: executor.clone() }; - update_user_contacts(user_id, &session).await?; + + if let Err(error) = this.send_initial_client_update(connection_id, user, zed_version, send_connection_id, &session).await { + tracing::error!(?error, "failed to send initial client update"); + return; + } let handle_io = handle_io.fuse(); futures::pin_mut!(handle_io); @@ -646,10 +611,10 @@ impl Server { }.fuse(); futures::pin_mut!(next_message); futures::select_biased! { - _ = teardown.changed().fuse() => return Ok(()), + _ = teardown.changed().fuse() => return, result = handle_io => { if let Err(error) = result { - tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O"); + tracing::error!(?error, "error handling I/O"); } break; } @@ -658,6 +623,8 @@ impl Server { let (permit, message) = next_message; if let Some(message) = message { let type_name = message.payload_type_name(); + // note: we copy all the fields from the parent span so we can query them in the logs. + // (https://github.com/tokio-rs/tracing/issues/2670). let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name); let span_enter = span.enter(); if let Some(handler) = this.handlers.get(&message.payload_type_id()) { @@ -675,10 +642,10 @@ impl Server { foreground_message_handlers.push(handle_message); } } else { - tracing::error!(%user_id, %login, %connection_id, %address, "no message handler"); + tracing::error!("no message handler"); } } else { - tracing::info!(%user_id, %login, %connection_id, %address, "connection closed"); + tracing::info!("connection closed"); break; } } @@ -686,15 +653,74 @@ impl Server { } drop(foreground_message_handlers); - tracing::info!(%user_id, %login, %connection_id, %address, "signing out"); + tracing::info!("signing out"); if let Err(error) = connection_lost(session, teardown, executor).await { - tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out"); + tracing::error!(?error, "error signing out"); } - Ok(()) }.instrument(span) } + async fn send_initial_client_update( + &self, + connection_id: ConnectionId, + user: User, + zed_version: ZedVersion, + mut send_connection_id: Option>, + session: &Session, + ) -> Result<()> { + self.peer.send( + connection_id, + proto::Hello { + peer_id: Some(connection_id.into()), + }, + )?; + tracing::info!("sent hello message"); + + if let Some(send_connection_id) = send_connection_id.take() { + let _ = send_connection_id.send(connection_id); + } + + if !user.connected_once { + self.peer.send(connection_id, proto::ShowContacts {})?; + self.app_state + .db + .set_user_connected_once(user.id, true) + .await?; + } + + let (contacts, channels_for_user, channel_invites) = future::try_join3( + self.app_state.db.get_contacts(user.id), + self.app_state.db.get_channels_for_user(user.id), + self.app_state.db.get_channel_invites_for_user(user.id), + ) + .await?; + + { + let mut pool = self.connection_pool.lock(); + pool.add_connection(connection_id, user.id, user.admin, zed_version); + self.peer.send( + connection_id, + build_initial_contacts_update(contacts, &pool), + )?; + self.peer.send( + connection_id, + build_update_user_channels(&channels_for_user), + )?; + self.peer.send( + connection_id, + build_channels_update(channels_for_user, channel_invites), + )?; + } + + if let Some(incoming_call) = self.app_state.db.incoming_call_for_user(user.id).await? { + self.peer.send(connection_id, incoming_call)?; + } + + update_user_contacts(user.id, &session).await?; + Ok(()) + } + pub async fn invite_code_redeemed( self: &Arc, inviter_id: UserId, @@ -896,7 +922,6 @@ pub async fn handle_websocket_request( let socket_address = socket_address.to_string(); ws.on_upgrade(move |socket| { - use util::ResultExt; let socket = socket .map_ok(to_tungstenite_message) .err_into() @@ -913,8 +938,7 @@ pub async fn handle_websocket_request( None, Executor::Production, ) - .await - .log_err(); + .await; } }) }