Start to send data to Snowflake too (#20698)

Conrad Irwin created

This PR adds support for sending telemetry events to AWS Kinesis.

In our AWS account we now have three new things:
* The [Kinesis data
stream](https://us-east-1.console.aws.amazon.com/kinesis/home?region=us-east-1#/streams/details/zed-telemetry/monitoring)
that we will actually write to.
* A [Firehose for
Axiom](https://us-east-1.console.aws.amazon.com/firehose/home?region=us-east-1#/details/telemetry-to-axiom/monitoring)
that sends events from that stream to Axiom for ad-hoc queries over
recent data.
* A [Firehose for
Snowflake](https://us-east-1.console.aws.amazon.com/firehose/home?region=us-east-1#/details/telemetry-to-snowflake/monitoring)
that sends events from that stream to Snowflake for long-term retention.
This Firehose also backs up data into an S3 bucket in case we want to
change how the system works in the future.

In a follow-up PR, we'll add support for ad-hoc telemetry events; and
slowly move away from the current Clickhouse defined schemas; though we
won't move off click house until we have what we need in Snowflake.

Co-Authored-By: Nathan <nathan@zed.dev>

Release Notes:

- N/A

Change summary

Cargo.lock                                      |  59 +++-
crates/collab/Cargo.toml                        |   1 
crates/collab/k8s/collab.template.yml           |  25 ++
crates/collab/src/api/events.rs                 | 196 ++++++++++++++++++
crates/collab/src/lib.rs                        |  43 ++++
crates/collab/src/tests/test_server.rs          |   5 
crates/telemetry_events/src/telemetry_events.rs |   4 
7 files changed, 305 insertions(+), 28 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1099,9 +1099,9 @@ dependencies = [
 
 [[package]]
 name = "aws-runtime"
-version = "1.4.2"
+version = "1.4.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2424565416eef55906f9f8cece2072b6b6a76075e3ff81483ebe938a89a4c05f"
+checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468"
 dependencies = [
  "aws-credential-types",
  "aws-sigv4",
@@ -1123,6 +1123,28 @@ dependencies = [
  "uuid",
 ]
 
+[[package]]
+name = "aws-sdk-kinesis"
+version = "1.51.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ad48026d3d53881146469b36358d633f1b8c9ad6eb3033f348600f981f2f449b"
+dependencies = [
+ "aws-credential-types",
+ "aws-runtime",
+ "aws-smithy-async",
+ "aws-smithy-http",
+ "aws-smithy-json",
+ "aws-smithy-runtime",
+ "aws-smithy-runtime-api",
+ "aws-smithy-types",
+ "aws-types",
+ "bytes 1.7.2",
+ "http 0.2.12",
+ "once_cell",
+ "regex-lite",
+ "tracing",
+]
+
 [[package]]
 name = "aws-sdk-s3"
 version = "1.47.0"
@@ -1227,9 +1249,9 @@ dependencies = [
 
 [[package]]
 name = "aws-sigv4"
-version = "1.2.3"
+version = "1.2.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5df1b0fa6be58efe9d4ccc257df0a53b89cd8909e86591a13ca54817c87517be"
+checksum = "5619742a0d8f253be760bfbb8e8e8368c69e3587e4637af5754e488a611499b1"
 dependencies = [
  "aws-credential-types",
  "aws-smithy-eventstream",
@@ -1288,9 +1310,9 @@ dependencies = [
 
 [[package]]
 name = "aws-smithy-eventstream"
-version = "0.60.4"
+version = "0.60.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e6363078f927f612b970edf9d1903ef5cef9a64d1e8423525ebb1f0a1633c858"
+checksum = "cef7d0a272725f87e51ba2bf89f8c21e4df61b9e49ae1ac367a6d69916ef7c90"
 dependencies = [
  "aws-smithy-types",
  "bytes 1.7.2",
@@ -1299,9 +1321,9 @@ dependencies = [
 
 [[package]]
 name = "aws-smithy-http"
-version = "0.60.10"
+version = "0.60.11"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "01dbcb6e2588fd64cfb6d7529661b06466419e4c54ed1c62d6510d2d0350a728"
+checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6"
 dependencies = [
  "aws-smithy-eventstream",
  "aws-smithy-runtime-api",
@@ -1339,9 +1361,9 @@ dependencies = [
 
 [[package]]
 name = "aws-smithy-runtime"
-version = "1.7.1"
+version = "1.7.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d1ce695746394772e7000b39fe073095db6d45a862d0767dd5ad0ac0d7f8eb87"
+checksum = "be28bd063fa91fd871d131fc8b68d7cd4c5fa0869bea68daca50dcb1cbd76be2"
 dependencies = [
  "aws-smithy-async",
  "aws-smithy-http",
@@ -1366,9 +1388,9 @@ dependencies = [
 
 [[package]]
 name = "aws-smithy-runtime-api"
-version = "1.7.2"
+version = "1.7.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e086682a53d3aa241192aa110fa8dfce98f2f5ac2ead0de84d41582c7e8fdb96"
+checksum = "92165296a47a812b267b4f41032ff8069ab7ff783696d217f0994a0d7ab585cd"
 dependencies = [
  "aws-smithy-async",
  "aws-smithy-types",
@@ -1383,9 +1405,9 @@ dependencies = [
 
 [[package]]
 name = "aws-smithy-types"
-version = "1.2.4"
+version = "1.2.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "273dcdfd762fae3e1650b8024624e7cd50e484e37abdab73a7a706188ad34543"
+checksum = "4fbd94a32b3a7d55d3806fe27d98d3ad393050439dd05eb53ece36ec5e3d3510"
 dependencies = [
  "base64-simd",
  "bytes 1.7.2",
@@ -1591,7 +1613,7 @@ dependencies = [
  "bitflags 2.6.0",
  "cexpr",
  "clang-sys",
- "itertools 0.10.5",
+ "itertools 0.12.1",
  "lazy_static",
  "lazycell",
  "proc-macro2",
@@ -2560,6 +2582,7 @@ dependencies = [
  "async-tungstenite",
  "audio",
  "aws-config",
+ "aws-sdk-kinesis",
  "aws-sdk-s3",
  "axum",
  "axum-extra",
@@ -5644,7 +5667,7 @@ dependencies = [
  "httpdate",
  "itoa",
  "pin-project-lite",
- "socket2 0.4.10",
+ "socket2 0.5.7",
  "tokio",
  "tower-service",
  "tracing",
@@ -6553,7 +6576,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4"
 dependencies = [
  "cfg-if",
- "windows-targets 0.48.5",
+ "windows-targets 0.52.6",
 ]
 
 [[package]]
@@ -14228,7 +14251,7 @@ version = "0.1.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
 dependencies = [
- "windows-sys 0.48.0",
+ "windows-sys 0.59.0",
 ]
 
 [[package]]

crates/collab/Cargo.toml 🔗

@@ -24,6 +24,7 @@ async-stripe.workspace = true
 async-tungstenite.workspace = true
 aws-config = { version = "1.1.5" }
 aws-sdk-s3 = { version = "1.15.0" }
+aws-sdk-kinesis = "1.51.0"
 axum = { version = "0.6", features = ["json", "headers", "ws"] }
 axum-extra = { version = "0.4", features = ["erased-json"] }
 base64.workspace = true

crates/collab/k8s/collab.template.yml 🔗

@@ -174,6 +174,31 @@ spec:
                 secretKeyRef:
                   name: blob-store
                   key: bucket
+            - name: KINESIS_ACCESS_KEY
+              valueFrom:
+                secretKeyRef:
+                  name: kinesis
+                  key: access_key
+            - name: KINESIS_SECRET_KEY
+              valueFrom:
+                secretKeyRef:
+                  name: kinesis
+                  key: secret_key
+            - name: KINESIS_STREAM
+              valueFrom:
+                secretKeyRef:
+                  name: kinesis
+                  key: stream
+            - name: KINESIS_REGION
+              valueFrom:
+                secretKeyRef:
+                  name: kinesis
+                  key: region
+            - name: BLOB_STORE_BUCKET
+              valueFrom:
+                secretKeyRef:
+                  name: blob-store
+                  key: bucket
             - name: CLICKHOUSE_URL
               valueFrom:
                 secretKeyRef:

crates/collab/src/api/events.rs 🔗

@@ -11,9 +11,10 @@ use axum::{
     routing::post,
     Extension, Router, TypedHeader,
 };
+use chrono::Duration;
 use rpc::ExtensionMetadata;
 use semantic_version::SemanticVersion;
-use serde::{Serialize, Serializer};
+use serde::{Deserialize, Serialize, Serializer};
 use sha2::{Digest, Sha256};
 use std::sync::{Arc, OnceLock};
 use telemetry_events::{
@@ -21,6 +22,7 @@ use telemetry_events::{
     EventRequestBody, EventWrapper, ExtensionEvent, InlineCompletionEvent, MemoryEvent, Panic,
     ReplEvent, SettingEvent,
 };
+use util::ResultExt;
 use uuid::Uuid;
 
 const CRASH_REPORTS_BUCKET: &str = "zed-crash-reports";
@@ -388,13 +390,6 @@ pub async fn post_events(
     country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
     body: Bytes,
 ) -> Result<()> {
-    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
-        Err(Error::http(
-            StatusCode::NOT_IMPLEMENTED,
-            "not supported".into(),
-        ))?
-    };
-
     let Some(expected) = calculate_json_checksum(app.clone(), &body) else {
         return Err(Error::http(
             StatusCode::INTERNAL_SERVER_ERROR,
@@ -416,6 +411,35 @@ pub async fn post_events(
     };
     let country_code = country_code_header.map(|h| h.to_string());
 
+    let first_event_at = chrono::Utc::now()
+        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
+
+    if let Some(kinesis_client) = app.kinesis_client.clone() {
+        if let Some(stream) = app.config.kinesis_stream.clone() {
+            let mut request = kinesis_client.put_records().stream_name(stream);
+            for row in for_snowflake(request_body.clone(), first_event_at) {
+                if let Some(data) = serde_json::to_vec(&row).log_err() {
+                    println!("{}", String::from_utf8_lossy(&data));
+                    request = request.records(
+                        aws_sdk_kinesis::types::PutRecordsRequestEntry::builder()
+                            .partition_key(request_body.system_id.clone().unwrap_or_default())
+                            .data(data.into())
+                            .build()
+                            .unwrap(),
+                    );
+                }
+            }
+            request.send().await.log_err();
+        }
+    };
+
+    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
+        Err(Error::http(
+            StatusCode::NOT_IMPLEMENTED,
+            "not supported".into(),
+        ))?
+    };
+
     let first_event_at = chrono::Utc::now()
         - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
 
@@ -1364,3 +1388,159 @@ pub fn calculate_json_checksum(app: Arc<AppState>, json: &impl AsRef<[u8]>) -> O
     summer.update(checksum_seed);
     Some(summer.finalize().into_iter().collect())
 }
+
+fn for_snowflake(
+    body: EventRequestBody,
+    first_event_at: chrono::DateTime<chrono::Utc>,
+) -> impl Iterator<Item = SnowflakeRow> {
+    body.events.into_iter().map(move |event| SnowflakeRow {
+        event: match &event.event {
+            Event::Editor(editor_event) => format!("editor_{}", editor_event.operation),
+            Event::InlineCompletion(inline_completion_event) => format!(
+                "inline_completion_{}",
+                if inline_completion_event.suggestion_accepted {
+                    "accept "
+                } else {
+                    "discard"
+                }
+            ),
+            Event::Call(call_event) => format!("call_{}", call_event.operation.replace(" ", "_")),
+            Event::Assistant(assistant_event) => {
+                format!(
+                    "assistant_{}",
+                    match assistant_event.phase {
+                        telemetry_events::AssistantPhase::Response => "response",
+                        telemetry_events::AssistantPhase::Invoked => "invoke",
+                        telemetry_events::AssistantPhase::Accepted => "accept",
+                        telemetry_events::AssistantPhase::Rejected => "reject",
+                    }
+                )
+            }
+            Event::Cpu(_) => "system_cpu".to_string(),
+            Event::Memory(_) => "system_memory".to_string(),
+            Event::App(app_event) => app_event.operation.replace(" ", "_"),
+            Event::Setting(_) => "setting_change".to_string(),
+            Event::Extension(_) => "extension_load".to_string(),
+            Event::Edit(_) => "edit".to_string(),
+            Event::Action(_) => "command_palette_action".to_string(),
+            Event::Repl(_) => "repl".to_string(),
+        },
+        system_id: body.system_id.clone(),
+        timestamp: first_event_at + Duration::milliseconds(event.milliseconds_since_first_event),
+        data: SnowflakeData {
+            installation_id: body.installation_id.clone(),
+            session_id: body.session_id.clone(),
+            metrics_id: body.metrics_id.clone(),
+            is_staff: body.is_staff,
+            app_version: body.app_version.clone(),
+            os_name: body.os_name.clone(),
+            os_version: body.os_version.clone(),
+            architecture: body.architecture.clone(),
+            release_channel: body.release_channel.clone(),
+            signed_in: event.signed_in,
+            editor_event: match &event.event {
+                Event::Editor(editor_event) => Some(editor_event.clone()),
+                _ => None,
+            },
+            inline_completion_event: match &event.event {
+                Event::InlineCompletion(inline_completion_event) => {
+                    Some(inline_completion_event.clone())
+                }
+                _ => None,
+            },
+            call_event: match &event.event {
+                Event::Call(call_event) => Some(call_event.clone()),
+                _ => None,
+            },
+            assistant_event: match &event.event {
+                Event::Assistant(assistant_event) => Some(assistant_event.clone()),
+                _ => None,
+            },
+            cpu_event: match &event.event {
+                Event::Cpu(cpu_event) => Some(cpu_event.clone()),
+                _ => None,
+            },
+            memory_event: match &event.event {
+                Event::Memory(memory_event) => Some(memory_event.clone()),
+                _ => None,
+            },
+            app_event: match &event.event {
+                Event::App(app_event) => Some(app_event.clone()),
+                _ => None,
+            },
+            setting_event: match &event.event {
+                Event::Setting(setting_event) => Some(setting_event.clone()),
+                _ => None,
+            },
+            extension_event: match &event.event {
+                Event::Extension(extension_event) => Some(extension_event.clone()),
+                _ => None,
+            },
+            edit_event: match &event.event {
+                Event::Edit(edit_event) => Some(edit_event.clone()),
+                _ => None,
+            },
+            repl_event: match &event.event {
+                Event::Repl(repl_event) => Some(repl_event.clone()),
+                _ => None,
+            },
+            action_event: match event.event {
+                Event::Action(action_event) => Some(action_event.clone()),
+                _ => None,
+            },
+        },
+    })
+}
+
+#[derive(Serialize, Deserialize)]
+struct SnowflakeRow {
+    pub event: String,
+    pub system_id: Option<String>,
+    pub timestamp: chrono::DateTime<chrono::Utc>,
+    pub data: SnowflakeData,
+}
+
+#[derive(Serialize, Deserialize)]
+struct SnowflakeData {
+    /// Identifier unique to each Zed installation (differs for stable, preview, dev)
+    pub installation_id: Option<String>,
+    /// Identifier unique to each logged in Zed user (randomly generated on first sign in)
+    /// Identifier unique to each Zed session (differs for each time you open Zed)
+    pub session_id: Option<String>,
+    pub metrics_id: Option<String>,
+    /// True for Zed staff, otherwise false
+    pub is_staff: Option<bool>,
+    /// Zed version number
+    pub app_version: String,
+    pub os_name: String,
+    pub os_version: Option<String>,
+    pub architecture: String,
+    /// Zed release channel (stable, preview, dev)
+    pub release_channel: Option<String>,
+    pub signed_in: bool,
+
+    #[serde(flatten)]
+    pub editor_event: Option<EditorEvent>,
+    #[serde(flatten)]
+    pub inline_completion_event: Option<InlineCompletionEvent>,
+    #[serde(flatten)]
+    pub call_event: Option<CallEvent>,
+    #[serde(flatten)]
+    pub assistant_event: Option<AssistantEvent>,
+    #[serde(flatten)]
+    pub cpu_event: Option<CpuEvent>,
+    #[serde(flatten)]
+    pub memory_event: Option<MemoryEvent>,
+    #[serde(flatten)]
+    pub app_event: Option<AppEvent>,
+    #[serde(flatten)]
+    pub setting_event: Option<SettingEvent>,
+    #[serde(flatten)]
+    pub extension_event: Option<ExtensionEvent>,
+    #[serde(flatten)]
+    pub edit_event: Option<EditEvent>,
+    #[serde(flatten)]
+    pub repl_event: Option<ReplEvent>,
+    #[serde(flatten)]
+    pub action_event: Option<ActionEvent>,
+}

crates/collab/src/lib.rs 🔗

@@ -170,6 +170,10 @@ pub struct Config {
     pub blob_store_access_key: Option<String>,
     pub blob_store_secret_key: Option<String>,
     pub blob_store_bucket: Option<String>,
+    pub kinesis_region: Option<String>,
+    pub kinesis_stream: Option<String>,
+    pub kinesis_access_key: Option<String>,
+    pub kinesis_secret_key: Option<String>,
     pub zed_environment: Arc<str>,
     pub openai_api_key: Option<Arc<str>>,
     pub google_ai_api_key: Option<Arc<str>>,
@@ -238,6 +242,10 @@ impl Config {
             stripe_api_key: None,
             supermaven_admin_api_key: None,
             user_backfiller_github_access_token: None,
+            kinesis_region: None,
+            kinesis_access_key: None,
+            kinesis_secret_key: None,
+            kinesis_stream: None,
         }
     }
 }
@@ -276,6 +284,7 @@ pub struct AppState {
     pub rate_limiter: Arc<RateLimiter>,
     pub executor: Executor,
     pub clickhouse_client: Option<::clickhouse::Client>,
+    pub kinesis_client: Option<::aws_sdk_kinesis::Client>,
     pub config: Config,
 }
 
@@ -332,6 +341,11 @@ impl AppState {
                 .clickhouse_url
                 .as_ref()
                 .and_then(|_| build_clickhouse_client(&config).log_err()),
+            kinesis_client: if config.kinesis_access_key.is_some() {
+                build_kinesis_client(&config).await.log_err()
+            } else {
+                None
+            },
             config,
         };
         Ok(Arc::new(this))
@@ -381,6 +395,35 @@ async fn build_blob_store_client(config: &Config) -> anyhow::Result<aws_sdk_s3::
     Ok(aws_sdk_s3::Client::new(&s3_config))
 }
 
+async fn build_kinesis_client(config: &Config) -> anyhow::Result<aws_sdk_kinesis::Client> {
+    let keys = aws_sdk_s3::config::Credentials::new(
+        config
+            .kinesis_access_key
+            .clone()
+            .ok_or_else(|| anyhow!("missing kinesis_access_key"))?,
+        config
+            .kinesis_secret_key
+            .clone()
+            .ok_or_else(|| anyhow!("missing kinesis_secret_key"))?,
+        None,
+        None,
+        "env",
+    );
+
+    let kinesis_config = aws_config::defaults(BehaviorVersion::latest())
+        .region(Region::new(
+            config
+                .kinesis_region
+                .clone()
+                .ok_or_else(|| anyhow!("missing blob_store_region"))?,
+        ))
+        .credentials_provider(keys)
+        .load()
+        .await;
+
+    Ok(aws_sdk_kinesis::Client::new(&kinesis_config))
+}
+
 fn build_clickhouse_client(config: &Config) -> anyhow::Result<::clickhouse::Client> {
     Ok(::clickhouse::Client::default()
         .with_url(

crates/collab/src/tests/test_server.rs 🔗

@@ -512,6 +512,7 @@ impl TestServer {
             rate_limiter: Arc::new(RateLimiter::new(test_db.db().clone())),
             executor,
             clickhouse_client: None,
+            kinesis_client: None,
             config: Config {
                 http_port: 0,
                 database_url: "".into(),
@@ -550,6 +551,10 @@ impl TestServer {
                 stripe_api_key: None,
                 supermaven_admin_api_key: None,
                 user_backfiller_github_access_token: None,
+                kinesis_region: None,
+                kinesis_stream: None,
+                kinesis_access_key: None,
+                kinesis_secret_key: None,
             },
         })
     }

crates/telemetry_events/src/telemetry_events.rs 🔗

@@ -4,7 +4,7 @@ use semantic_version::SemanticVersion;
 use serde::{Deserialize, Serialize};
 use std::{fmt::Display, sync::Arc, time::Duration};
 
-#[derive(Serialize, Deserialize, Debug)]
+#[derive(Serialize, Deserialize, Debug, Clone)]
 pub struct EventRequestBody {
     /// Identifier unique to each system Zed is installed on
     pub system_id: Option<String>,
@@ -32,7 +32,7 @@ impl EventRequestBody {
     }
 }
 
-#[derive(Serialize, Deserialize, Debug)]
+#[derive(Serialize, Deserialize, Debug, Clone)]
 pub struct EventWrapper {
     pub signed_in: bool,
     /// Duration between this event's timestamp and the timestamp of the first event in the current batch