WIP

Nathan Sobo created

Change summary

Cargo.lock                | 13 ++++++++++++
crates/collab/Cargo.toml  |  5 ++-
crates/collab/src/main.rs | 44 ++++++++++++++++++++++++++++++----------
crates/collab/src/rpc.rs  | 29 ++++++++++++++++++++------
4 files changed, 71 insertions(+), 20 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -869,6 +869,7 @@ dependencies = [
  "theme",
  "time 0.2.27",
  "tokio",
+ "tokio-stream",
  "tokio-tungstenite",
  "toml",
  "tonic",
@@ -1180,6 +1181,16 @@ dependencies = [
  "winapi 0.3.9",
 ]
 
+[[package]]
+name = "dashmap"
+version = "4.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c"
+dependencies = [
+ "cfg-if 1.0.0",
+ "num_cpus",
+]
+
 [[package]]
 name = "data-url"
 version = "0.1.0"
@@ -2940,6 +2951,8 @@ checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8"
 dependencies = [
  "async-trait",
  "crossbeam-channel 0.5.0",
+ "dashmap",
+ "fnv",
  "futures-channel",
  "futures-executor",
  "futures-util",

crates/collab/Cargo.toml 🔗

@@ -28,8 +28,8 @@ futures = "0.3"
 lazy_static = "1.4"
 lipsum = { version = "0.8", optional = true }
 log = { version = "0.4.16", features = ["kv_unstable_serde"] }
-opentelemetry = { version = "0.17", features = ["rt-tokio"] }
-opentelemetry-otlp = { version = "0.10", features = ["tls-roots"] }
+opentelemetry = { version = "0.17", features = ["metrics", "rt-tokio"] }
+opentelemetry-otlp = { version = "0.10", features = ["metrics", "tls-roots"] }
 parking_lot = "0.11.1"
 rand = "0.8"
 scrypt = "0.7"
@@ -38,6 +38,7 @@ serde_json = "1.0"
 sha-1 = "0.9"
 time = "0.2"
 tokio = { version = "1", features = ["full"] }
+tokio-stream = "0.1"
 tokio-tungstenite = "0.17"
 tonic = "0.6"
 tower = "0.4"

crates/collab/src/main.rs 🔗

@@ -6,11 +6,13 @@ mod rpc;
 
 use axum::{body::Body, http::StatusCode, response::IntoResponse, Router};
 use db::{Db, PostgresDb};
+use opentelemetry::sdk::metrics::PushController;
 use serde::Deserialize;
 use std::{
     net::{SocketAddr, TcpListener},
     sync::Arc,
 };
+use tokio_stream::wrappers::IntervalStream;
 use tracing::metadata::LevelFilter;
 
 #[derive(Default, Deserialize)]
@@ -51,7 +53,7 @@ async fn main() -> Result<()> {
     }
 
     let config = envy::from_env::<Config>().expect("error loading config");
-    init_tracing(&config);
+    let _metrics_push_controller = init_telemetry(&config);
     let state = AppState::new(&config).await?;
 
     let listener = TcpListener::bind(&format!("0.0.0.0:{}", config.http_port))
@@ -113,7 +115,7 @@ impl std::fmt::Display for Error {
     }
 }
 
-pub fn init_tracing(config: &Config) -> Option<()> {
+pub fn init_telemetry(config: &Config) -> Option<PushController> {
     use opentelemetry::KeyValue;
     use opentelemetry_otlp::WithExportConfig;
     use std::str::FromStr;
@@ -124,23 +126,23 @@ pub fn init_tracing(config: &Config) -> Option<()> {
         .honeycomb_api_key
         .clone()
         .zip(config.honeycomb_dataset.clone())?;
-
     let mut metadata = tonic::metadata::MetadataMap::new();
     metadata.insert("x-honeycomb-team", honeycomb_api_key.parse().unwrap());
+
+    let service_name = KeyValue::new("service.name", honeycomb_dataset.clone());
+
     let tracer = opentelemetry_otlp::new_pipeline()
         .tracing()
         .with_exporter(
             opentelemetry_otlp::new_exporter()
                 .tonic()
                 .with_endpoint("https://api.honeycomb.io")
-                .with_metadata(metadata),
+                .with_metadata(metadata.clone()),
+        )
+        .with_trace_config(
+            opentelemetry::sdk::trace::config()
+                .with_resource(opentelemetry::sdk::Resource::new([service_name.clone()])),
         )
-        .with_trace_config(opentelemetry::sdk::trace::config().with_resource(
-            opentelemetry::sdk::Resource::new(vec![KeyValue::new(
-                "service.name",
-                honeycomb_dataset,
-            )]),
-        ))
         .install_batch(opentelemetry::runtime::Tokio)
         .expect("failed to initialize tracing");
 
@@ -158,5 +160,25 @@ pub fn init_tracing(config: &Config) -> Option<()> {
 
     tracing::subscriber::set_global_default(subscriber).unwrap();
 
-    None
+    // metadata.insert("x-honeycomb-dataset", "collab_metrics".parse().unwrap());
+    // let push_controller = opentelemetry_otlp::new_pipeline()
+    //     .metrics(tokio::spawn, |duration| {
+    //         IntervalStream::new(tokio::time::interval(duration))
+    //     })
+    //     .with_exporter(
+    //         opentelemetry_otlp::new_exporter()
+    //             .tonic()
+    //             .with_endpoint("https://api.honeycomb.io")
+    //             .with_metadata(metadata.clone()),
+    //     )
+    //     .with_resource([service_name])
+    //     .build()
+    //     .unwrap();
+
+    let push_controller = opentelemetry::sdk::export::metrics::stdout(tokio::spawn, |duration| {
+        IntervalStream::new(tokio::time::interval(duration))
+    })
+    .init();
+
+    Some(push_controller)
 }

crates/collab/src/rpc.rs 🔗

@@ -25,6 +25,7 @@ use axum::{
 use collections::{HashMap, HashSet};
 use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
 use lazy_static::lazy_static;
+use opentelemetry::metrics::{Meter, ValueRecorder};
 use rpc::{
     proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
     Connection, ConnectionId, Peer, TypedEnvelope,
@@ -48,6 +49,16 @@ use tokio::{
 use tower::ServiceBuilder;
 use tracing::{info_span, instrument, Instrument};
 
+lazy_static! {
+    static ref METER: Meter = opentelemetry::global::meter("");
+    static ref CONNECTIONS_COUNTER: opentelemetry::metrics::Counter<u64> =
+        METER.u64_counter("connections").init();
+    // static ref REGISTERED_PROJECTS_GAUGE: ValueRecorder<u64> =
+    //     METER.u64_value_recorder("registered projects").init();
+    // static ref SHARED_PROJECTS_GAUGE: ValueRecorder<u64> =
+    //     METER.u64_value_recorder("shared projects").init();
+}
+
 type MessageHandler =
     Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
 
@@ -1096,13 +1107,17 @@ impl<'a> Drop for StoreWriteGuard<'a> {
         self.check_invariants();
 
         let metrics = self.metrics();
-        tracing::info!(
-            connections = metrics.connections as f32,
-            registered_projects = metrics.registered_projects as f32,
-            shared_projects = metrics.shared_projects as f32,
-            collaborators_per_project = metrics.collaborators_per_project,
-            "metrics"
-        );
+
+        CONNECTIONS_COUNTER.add(1, &[]);
+        // CONNECTIONS_COUNTER. record(metrics.connections as u64 * 2, &[]);
+
+        // METER.record_batch(
+        //     &[],
+        //     [
+        //         REGISTERED_PROJECTS_GAUGE.measurement(metrics.registered_projects as u64),
+        //         SHARED_PROJECTS_GAUGE.measurement(metrics.shared_projects as u64),
+        //     ],
+        // );
     }
 }