diff --git a/Cargo.lock b/Cargo.lock index 60b35ef3c624651fd7cc8e6f52d7e9d645492c19..550b390995db390aabd71c5aafb5e9afa9e6dcb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -876,6 +876,7 @@ dependencies = [ "tonic", "tower", "tracing", + "tracing-log", "tracing-opentelemetry", "tracing-subscriber", "util", @@ -2590,6 +2591,15 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata", +] + [[package]] name = "matches" version = "0.1.8" @@ -3736,6 +3746,15 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax", +] + [[package]] name = "regex-syntax" version = "0.6.25" @@ -3848,7 +3867,6 @@ dependencies = [ "collections", "futures", "gpui", - "log", "parking_lot", "prost 0.8.0", "prost-build 0.8.0", @@ -3858,6 +3876,7 @@ dependencies = [ "smol", "smol-timeout", "tempdir", + "tracing", "util", "zstd", ] @@ -5246,9 +5265,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.26" +version = "0.1.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" +checksum = "5d0ecdcb44a79f0fe9844f0c4f33a342cbcbb5117de8001e6ba0dc2351327d09" dependencies = [ "cfg-if 1.0.0", "log", @@ -5259,9 +5278,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.15" +version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c42e6fa53307c8a17e4ccd4dc81cf5ec38db9209f59b222210375b54ee40d1e2" +checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c" dependencies = [ "proc-macro2", "quote", @@ -5319,9 +5338,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bc28f93baff38037f64e6f43d34cfa1605f27a49c34e8a04c5e78b0babf2596" dependencies = [ "ansi_term 0.12.1", + "lazy_static", + "matchers", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] diff --git a/crates/collab/Cargo.toml b/crates/collab/Cargo.toml index a5541990d302919956a67bf5cf5f815f794a58de..3ed2beca147408786c1229233ece7e30a11f6ef7 100644 --- a/crates/collab/Cargo.toml +++ b/crates/collab/Cargo.toml @@ -24,11 +24,9 @@ axum = { version = "0.5", features = ["json", "headers", "ws"] } base64 = "0.13" clap = { version = "3.1", features = ["derive"], optional = true } envy = "0.4.2" -env_logger = "0.8" futures = "0.3" lazy_static = "1.4" lipsum = { version = "0.8", optional = true } -log = { version = "0.4.16", features = ["kv_unstable_serde"] } opentelemetry = { version = "0.17", features = ["rt-tokio"] } opentelemetry-otlp = { version = "0.10", features = ["tls-roots"] } parking_lot = "0.11.1" @@ -44,9 +42,10 @@ tokio-tungstenite = "0.17" tonic = "0.6" tower = "0.4" toml = "0.5.8" -tracing = "0.1" +tracing = "0.1.34" +tracing-log = "0.1.3" tracing-opentelemetry = "0.17" -tracing-subscriber = "0.3" +tracing-subscriber = { version = "0.3.11", features = ["env-filter"] } [dependencies.sqlx] version = "0.5.2" @@ -59,6 +58,7 @@ rpc = { path = "../rpc", features = ["test-support"] } client = { path = "../client", features = ["test-support"] } editor = { path = "../editor", features = ["test-support"] } language = { path = "../language", features = ["test-support"] } +log = { version = "0.4.16", features = ["kv_unstable_serde"] } lsp = { path = "../lsp", features = ["test-support"] } project = { path = "../project", features = ["test-support"] } settings = { path = "../settings", features = ["test-support"] } diff --git a/crates/collab/k8s/environments/production.sh b/crates/collab/k8s/environments/production.sh index bac7fbedf7448b1746770ad0fa2fabebe8bd726a..039c1f60a5d0d7bea086cb0352938f86d8913fb6 100644 --- a/crates/collab/k8s/environments/production.sh +++ b/crates/collab/k8s/environments/production.sh @@ -1,3 +1,2 @@ ZED_ENVIRONMENT=production -RUST_LOG=info -TRACE_LEVEL=debug +RUST_LOG=info,rpc=debug diff --git a/crates/collab/k8s/environments/staging.sh b/crates/collab/k8s/environments/staging.sh index ed7121715d80d4b4d3ef9f708f90a2728e3edfb0..ece0851ea1b33a67266be1186486309a298f5650 100644 --- a/crates/collab/k8s/environments/staging.sh +++ b/crates/collab/k8s/environments/staging.sh @@ -1,3 +1,2 @@ ZED_ENVIRONMENT=staging -RUST_LOG=info -TRACE_LEVEL=debug +RUST_LOG=info,rpc=debug diff --git a/crates/collab/k8s/manifest.template.yml b/crates/collab/k8s/manifest.template.yml index 2e9f0ae2984e43e67b23c0a4d4b279bceb4f9091..73b20409fdcf2ac477ff385a7f988f2898e28abb 100644 --- a/crates/collab/k8s/manifest.template.yml +++ b/crates/collab/k8s/manifest.template.yml @@ -83,8 +83,6 @@ spec: key: token - name: RUST_LOG value: ${RUST_LOG} - - name: TRACE_LEVEL - value: ${TRACE_LEVEL} - name: HONEYCOMB_DATASET value: "collab" - name: HONEYCOMB_API_KEY diff --git a/crates/collab/src/auth.rs b/crates/collab/src/auth.rs index aad331faecfa6e05328d6f665862b054ed11fe51..686a07bbf507359db7f8f5fea419bf982d6c9f0a 100644 --- a/crates/collab/src/auth.rs +++ b/crates/collab/src/auth.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use super::db::{self, UserId}; use crate::{AppState, Error}; -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use axum::{ http::{self, Request, StatusCode}, middleware::Next, @@ -51,7 +51,12 @@ pub async fn validate_header(mut req: Request, next: Next) -> impl Into } if credentials_valid { - req.extensions_mut().insert(user_id); + let user = state + .db + .get_user_by_id(user_id) + .await? + .ok_or_else(|| anyhow!("user {} not found", user_id))?; + req.extensions_mut().insert(user); Ok::<_, Error>(next.run(req).await) } else { Err(Error::Http( diff --git a/crates/collab/src/main.rs b/crates/collab/src/main.rs index d7f8905a9a39a25bdc66fb340e1c3583c9fe7a99..bd9f8fd69d166c50be99accbe45640f4c9bcf43e 100644 --- a/crates/collab/src/main.rs +++ b/crates/collab/src/main.rs @@ -11,7 +11,9 @@ use std::{ net::{SocketAddr, TcpListener}, sync::Arc, }; -use tracing::metadata::LevelFilter; +use tracing_log::LogTracer; +use tracing_subscriber::filter::EnvFilter; +use util::ResultExt; #[derive(Default, Deserialize)] pub struct Config { @@ -20,7 +22,7 @@ pub struct Config { pub api_token: String, pub honeycomb_api_key: Option, pub honeycomb_dataset: Option, - pub trace_level: Option, + pub rust_log: Option, } pub struct AppState { @@ -41,10 +43,8 @@ impl AppState { #[tokio::main] async fn main() -> Result<()> { - env_logger::init(); - if let Err(error) = env::load_dotenv() { - log::error!( + eprintln!( "error loading .env.toml (this is expected in production): {}", error ); @@ -119,42 +119,44 @@ pub fn init_tracing(config: &Config) -> Option<()> { use std::str::FromStr; use tracing_opentelemetry::OpenTelemetryLayer; use tracing_subscriber::layer::SubscriberExt; + let rust_log = config.rust_log.clone()?; + + LogTracer::init().log_err()?; - let (honeycomb_api_key, honeycomb_dataset) = config + let open_telemetry_layer = config .honeycomb_api_key .clone() - .zip(config.honeycomb_dataset.clone())?; - - let mut metadata = tonic::metadata::MetadataMap::new(); - metadata.insert("x-honeycomb-team", honeycomb_api_key.parse().unwrap()); - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter( - opentelemetry_otlp::new_exporter() - .tonic() - .with_endpoint("https://api.honeycomb.io") - .with_metadata(metadata), - ) - .with_trace_config(opentelemetry::sdk::trace::config().with_resource( - opentelemetry::sdk::Resource::new(vec![KeyValue::new( - "service.name", - honeycomb_dataset, - )]), - )) - .install_batch(opentelemetry::runtime::Tokio) - .expect("failed to initialize tracing"); + .zip(config.honeycomb_dataset.clone()) + .map(|(honeycomb_api_key, honeycomb_dataset)| { + let mut metadata = tonic::metadata::MetadataMap::new(); + metadata.insert("x-honeycomb-team", honeycomb_api_key.parse().unwrap()); + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint("https://api.honeycomb.io") + .with_metadata(metadata), + ) + .with_trace_config(opentelemetry::sdk::trace::config().with_resource( + opentelemetry::sdk::Resource::new(vec![KeyValue::new( + "service.name", + honeycomb_dataset, + )]), + )) + .install_batch(opentelemetry::runtime::Tokio) + .expect("failed to initialize tracing"); + + OpenTelemetryLayer::new(tracer) + }); let subscriber = tracing_subscriber::Registry::default() - .with(OpenTelemetryLayer::new(tracer)) - .with(tracing_subscriber::fmt::layer()) + .with(open_telemetry_layer) .with( - config - .trace_level - .as_ref() - .map_or(LevelFilter::INFO, |level| { - LevelFilter::from_str(level).unwrap() - }), - ); + tracing_subscriber::fmt::layer() + .event_format(tracing_subscriber::fmt::format().pretty()), + ) + .with(EnvFilter::from_str(rust_log.as_str()).log_err()?); tracing::subscriber::set_global_default(subscriber).unwrap(); diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 1e7384c2c3dd91c2591ee46725f0ab73c7c99aa3..40c3ba582ab8fdee635ced52b432286cf3abafd1 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -2,7 +2,7 @@ mod store; use crate::{ auth, - db::{self, ChannelId, MessageId, UserId}, + db::{self, ChannelId, MessageId, User, UserId}, AppState, Result, }; use anyhow::anyhow; @@ -49,7 +49,7 @@ use tokio::{ time::Sleep, }; use tower::ServiceBuilder; -use tracing::{info_span, Instrument}; +use tracing::{info_span, instrument, Instrument}; type MessageHandler = Box, Box) -> BoxFuture<'static, ()>>; @@ -244,12 +244,14 @@ impl Server { self: &Arc, connection: Connection, address: String, - user_id: UserId, + user: User, mut send_connection_id: Option>, executor: E, ) -> impl Future> { let mut this = self.clone(); - let span = info_span!("handle connection", %user_id, %address); + let user_id = user.id; + let login = user.github_login; + let span = info_span!("handle connection", %user_id, %login, %address); async move { let (connection_id, handle_io, mut incoming_rx) = this .peer @@ -264,7 +266,7 @@ impl Server { }) .await; - tracing::info!(%user_id, %connection_id, %address, "connection opened"); + tracing::info!(%user_id, %login, %connection_id, %address, "connection opened"); if let Some(send_connection_id) = send_connection_id.as_mut() { let _ = send_connection_id.send(connection_id).await; @@ -287,14 +289,14 @@ impl Server { futures::select_biased! { result = handle_io => { if let Err(error) = result { - tracing::error!(%error, "error handling I/O"); + tracing::error!(?error, %user_id, %login, %connection_id, %address, "error handling I/O"); } break; } message = next_message => { if let Some(message) = message { let type_name = message.payload_type_name(); - let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name); + let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name); async { if let Some(handler) = this.handlers.get(&message.payload_type_id()) { let notifications = this.notifications.clone(); @@ -312,25 +314,27 @@ impl Server { handle_message.await; } } else { - tracing::error!("no message handler"); + tracing::error!(%user_id, %login, %connection_id, %address, "no message handler"); } }.instrument(span).await; } else { - tracing::info!(%user_id, %connection_id, %address, "connection closed"); + tracing::info!(%user_id, %login, %connection_id, %address, "connection closed"); break; } } } } + tracing::info!(%user_id, %login, %connection_id, %address, "signing out"); if let Err(error) = this.sign_out(connection_id).await { - tracing::error!(%error, "error signing out"); + tracing::error!(%user_id, %login, %connection_id, %address, ?error, "error signing out"); } Ok(()) }.instrument(span) } + #[instrument(skip(self), err)] async fn sign_out(self: &mut Arc, connection_id: ConnectionId) -> Result<()> { self.peer.disconnect(connection_id); let removed_connection = self.store_mut().await.remove_connection(connection_id)?; @@ -1116,29 +1120,6 @@ impl Server { Ok(()) } - // #[instrument(skip(self, state, user_ids))] - // fn update_contacts_for_users<'a>( - // self: &Arc, - // state: &Store, - // user_ids: impl IntoIterator, - // ) { - // for user_id in user_ids { - // let contacts = state.contacts_for_user(*user_id); - // for connection_id in state.connection_ids_for_user(*user_id) { - // self.peer - // .send( - // connection_id, - // proto::UpdateContacts { - // contacts: contacts.clone(), - // pending_requests_from_user_ids: Default::default(), - // pending_requests_to_user_ids: Default::default(), - // }, - // ) - // .trace_err(); - // } - // } - // } - async fn join_channel( self: Arc, request: TypedEnvelope, @@ -1443,7 +1424,7 @@ pub async fn handle_websocket_request( TypedHeader(ProtocolVersion(protocol_version)): TypedHeader, ConnectInfo(socket_address): ConnectInfo, Extension(server): Extension>, - Extension(user_id): Extension, + Extension(user): Extension, ws: WebSocketUpgrade, ) -> axum::response::Response { if protocol_version != rpc::PROTOCOL_VERSION { @@ -1463,7 +1444,7 @@ pub async fn handle_websocket_request( let connection = Connection::new(Box::pin(socket)); async move { server - .handle_connection(connection, socket_address, user_id, None, RealExecutor) + .handle_connection(connection, socket_address, user, None, RealExecutor) .await .log_err(); } @@ -6462,6 +6443,7 @@ mod tests { let client_name = name.to_string(); let mut client = Client::new(http.clone()); let server = self.server.clone(); + let db = self.app_state.db.clone(); let connection_killers = self.connection_killers.clone(); let forbid_connections = self.forbid_connections.clone(); let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16); @@ -6482,6 +6464,7 @@ mod tests { assert_eq!(credentials.access_token, "the-token"); let server = server.clone(); + let db = db.clone(); let connection_killers = connection_killers.clone(); let forbid_connections = forbid_connections.clone(); let client_name = client_name.clone(); @@ -6495,11 +6478,12 @@ mod tests { let (client_conn, server_conn, killed) = Connection::in_memory(cx.background()); connection_killers.lock().insert(user_id, killed); + let user = db.get_user_by_id(user_id).await.unwrap().unwrap(); cx.background() .spawn(server.handle_connection( server_conn, client_name, - user_id, + user, Some(connection_id_tx), cx.background(), )) diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 2750d9078f3a011e59f6c67e40bd821bac9be013..da018169317474cfab6a4bf23a564841695f95ec 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -21,13 +21,13 @@ async-lock = "2.4" async-tungstenite = "0.16" base64 = "0.13" futures = "0.3" -log = { version = "0.4.16", features = ["kv_unstable_serde"] } parking_lot = "0.11.1" prost = "0.8" rand = "0.8" rsa = "0.4" serde = { version = "1", features = ["derive"] } smol-timeout = "0.6" +tracing = { version = "0.1.34", features = ["log"] } zstd = "0.9" [build-dependencies] diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 7d7d1c719495260845391f265770fdde3d76e6a3..2fcc5dc09bcf3c64a0c4bc608545a51a652607e8 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -22,6 +22,7 @@ use std::{ }, time::Duration, }; +use tracing::instrument; #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] pub struct ConnectionId(pub u32); @@ -108,6 +109,7 @@ impl Peer { }) } + #[instrument(skip_all)] pub async fn add_connection( self: &Arc, connection: Connection, @@ -145,9 +147,12 @@ impl Peer { let this = self.clone(); let response_channels = connection_state.response_channels.clone(); let handle_io = async move { + tracing::debug!(%connection_id, "handle io future: start"); + let _end_connection = util::defer(|| { response_channels.lock().take(); this.connections.write().remove(&connection_id); + tracing::debug!(%connection_id, "handle io future: end"); }); // Send messages on this frequency so the connection isn't closed. @@ -159,49 +164,68 @@ impl Peer { futures::pin_mut!(receive_timeout); loop { + tracing::debug!(%connection_id, "outer loop iteration start"); let read_message = reader.read().fuse(); futures::pin_mut!(read_message); loop { + tracing::debug!(%connection_id, "inner loop iteration start"); futures::select_biased! { outgoing = outgoing_rx.next().fuse() => match outgoing { Some(outgoing) => { + tracing::debug!(%connection_id, "outgoing rpc message: writing"); if let Some(result) = writer.write(outgoing).timeout(WRITE_TIMEOUT).await { + tracing::debug!(%connection_id, "outgoing rpc message: done writing"); result.context("failed to write RPC message")?; + tracing::debug!(%connection_id, "keepalive interval: resetting after sending message"); keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); } else { + tracing::debug!(%connection_id, "outgoing rpc message: writing timed out"); Err(anyhow!("timed out writing message"))?; } } None => { - log::info!("outgoing channel closed"); + tracing::debug!(%connection_id, "outgoing rpc message: channel closed"); return Ok(()) }, }, incoming = read_message => { - let incoming = incoming.context("received invalid RPC message")?; + let incoming = incoming.context("error reading rpc message from socket")?; + tracing::debug!(%connection_id, "incoming rpc message: received"); + tracing::debug!(%connection_id, "receive timeout: resetting"); receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse()); if let proto::Message::Envelope(incoming) = incoming { + tracing::debug!(%connection_id, "incoming rpc message: processing"); match incoming_tx.send(incoming).timeout(RECEIVE_TIMEOUT).await { - Some(Ok(_)) => {}, + Some(Ok(_)) => { + tracing::debug!(%connection_id, "incoming rpc message: processed"); + }, Some(Err(_)) => { - log::info!("incoming channel closed"); + tracing::debug!(%connection_id, "incoming rpc message: channel closed"); return Ok(()) }, - None => Err(anyhow!("timed out processing incoming message"))?, + None => { + tracing::debug!(%connection_id, "incoming rpc message: processing timed out"); + Err(anyhow!("timed out processing incoming message"))? + }, } } break; }, _ = keepalive_timer => { + tracing::debug!(%connection_id, "keepalive interval: pinging"); if let Some(result) = writer.write(proto::Message::Ping).timeout(WRITE_TIMEOUT).await { + tracing::debug!(%connection_id, "keepalive interval: done pinging"); result.context("failed to send keepalive")?; + tracing::debug!(%connection_id, "keepalive interval: resetting after pinging"); keepalive_timer.set(create_timer(KEEPALIVE_INTERVAL).fuse()); } else { + tracing::debug!(%connection_id, "keepalive interval: pinging timed out"); Err(anyhow!("timed out sending keepalive"))?; } } _ = receive_timeout => { + tracing::debug!(%connection_id, "receive timeout: delay between messages too long"); Err(anyhow!("delay between messages too long"))? } } @@ -217,25 +241,71 @@ impl Peer { let incoming_rx = incoming_rx.filter_map(move |incoming| { let response_channels = response_channels.clone(); async move { + let message_id = incoming.id; + tracing::debug!(?incoming, "incoming message future: start"); + let _end = util::defer(move || { + tracing::debug!( + %connection_id, + message_id, + "incoming message future: end" + ); + }); + if let Some(responding_to) = incoming.responding_to { + tracing::debug!( + %connection_id, + message_id, + responding_to, + "incoming response: received" + ); let channel = response_channels.lock().as_mut()?.remove(&responding_to); if let Some(tx) = channel { let requester_resumed = oneshot::channel(); if let Err(error) = tx.send((incoming, requester_resumed.0)) { - log::debug!( - "received RPC but request future was dropped {:?}", - error.0 + tracing::debug!( + %connection_id, + message_id, + responding_to = responding_to, + ?error, + "incoming response: request future dropped", ); } + + tracing::debug!( + %connection_id, + message_id, + responding_to, + "incoming response: waiting to resume requester" + ); let _ = requester_resumed.1.await; + tracing::debug!( + %connection_id, + message_id, + responding_to, + "incoming response: requester resumed" + ); } else { - log::warn!("received RPC response to unknown request {}", responding_to); + tracing::warn!( + %connection_id, + message_id, + responding_to, + "incoming response: unknown request" + ); } None } else { + tracing::debug!( + %connection_id, + message_id, + "incoming message: received" + ); proto::build_typed_envelope(connection_id, incoming).or_else(|| { - log::error!("unable to construct a typed envelope"); + tracing::error!( + %connection_id, + message_id, + "unable to construct a typed envelope" + ); None }) }