Add telemetry events backend for collab (#8220)

Conrad Irwin , Marshall , and Marshall Bowers created

Send telemetry to collab not zed.dev

Release Notes:

- N/A

---------

Co-authored-by: Marshall <marshall@zed.dev>
Co-authored-by: Marshall Bowers <elliott.codes@gmail.com>

Change summary

Cargo.lock                                      |  89 ++
Cargo.toml                                      |   5 
crates/assistant/Cargo.toml                     |   1 
crates/assistant/src/assistant_panel.rs         |   2 
crates/client/Cargo.toml                        |   3 
crates/client/src/client.rs                     |   2 
crates/client/src/telemetry.rs                  | 237 +----
crates/collab/.env.toml                         |   6 
crates/collab/Cargo.toml                        |   4 
crates/collab/src/api.rs                        |   1 
crates/collab/src/api/events.rs                 | 805 +++++++++++++++++++
crates/collab/src/lib.rs                        |  50 +
crates/collab/src/main.rs                       |   1 
crates/collab/src/tests/test_server.rs          |   6 
crates/telemetry_events/Cargo.toml              |  13 
crates/telemetry_events/LICENSE-GPL             |   1 
crates/telemetry_events/src/telemetry_events.rs | 131 +++
crates/util/src/http.rs                         |  13 
typos.toml                                      |   3 
19 files changed, 1,196 insertions(+), 177 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -360,6 +360,7 @@ dependencies = [
  "serde_json",
  "settings",
  "smol",
+ "telemetry_events",
  "theme",
  "tiktoken-rs",
  "ui",
@@ -1901,6 +1902,49 @@ dependencies = [
  "util",
 ]
 
+[[package]]
+name = "clickhouse"
+version = "0.11.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a0875e527e299fc5f4faba42870bf199a39ab0bb2dbba1b8aef0a2151451130f"
+dependencies = [
+ "bstr",
+ "bytes 1.5.0",
+ "clickhouse-derive",
+ "clickhouse-rs-cityhash-sys",
+ "futures 0.3.28",
+ "hyper",
+ "hyper-tls",
+ "lz4",
+ "sealed",
+ "serde",
+ "static_assertions",
+ "thiserror",
+ "tokio",
+ "url",
+]
+
+[[package]]
+name = "clickhouse-derive"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "18af5425854858c507eec70f7deb4d5d8cec4216fcb086283a78872387281ea5"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "serde_derive_internals",
+ "syn 1.0.109",
+]
+
+[[package]]
+name = "clickhouse-rs-cityhash-sys"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4baf9d4700a28d6cb600e17ed6ae2b43298a5245f1f76b4eab63027ebfd592b9"
+dependencies = [
+ "cc",
+]
+
 [[package]]
 name = "client"
 version = "0.1.0"
@@ -1933,6 +1977,7 @@ dependencies = [
  "smol",
  "sum_tree",
  "sysinfo",
+ "telemetry_events",
  "tempfile",
  "text",
  "thiserror",
@@ -2018,6 +2063,7 @@ dependencies = [
  "channel",
  "chrono",
  "clap 3.2.25",
+ "clickhouse",
  "client",
  "clock",
  "collab_ui",
@@ -2032,6 +2078,7 @@ dependencies = [
  "futures 0.3.28",
  "git",
  "gpui",
+ "hex",
  "hyper",
  "indoc",
  "language",
@@ -2062,8 +2109,10 @@ dependencies = [
  "serde_json",
  "settings",
  "sha-1 0.9.8",
+ "sha2 0.10.7",
  "smallvec",
  "sqlx",
+ "telemetry_events",
  "text",
  "theme",
  "time",
@@ -5255,6 +5304,26 @@ dependencies = [
  "url",
 ]
 
+[[package]]
+name = "lz4"
+version = "1.24.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7e9e2dd86df36ce760a60f6ff6ad526f7ba1f14ba0356f8254fb6905e6494df1"
+dependencies = [
+ "libc",
+ "lz4-sys",
+]
+
+[[package]]
+name = "lz4-sys"
+version = "1.9.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900"
+dependencies = [
+ "cc",
+ "libc",
+]
+
 [[package]]
 name = "mac"
 version = "0.1.1"
@@ -8176,6 +8245,18 @@ version = "4.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b"
 
+[[package]]
+name = "sealed"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6b5e421024b5e5edfbaa8e60ecf90bda9dbffc602dbb230e6028763f85f0c68c"
+dependencies = [
+ "heck 0.3.3",
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
 [[package]]
 name = "search"
 version = "0.1.0"
@@ -9375,6 +9456,14 @@ dependencies = [
  "workspace",
 ]
 
+[[package]]
+name = "telemetry_events"
+version = "0.1.0"
+dependencies = [
+ "serde",
+ "util",
+]
+
 [[package]]
 name = "tempfile"
 version = "3.9.0"

Cargo.toml 🔗

@@ -80,6 +80,7 @@ members = [
     "crates/theme",
     "crates/theme_importer",
     "crates/theme_selector",
+    "crates/telemetry_events",
     "crates/ui",
     "crates/util",
     "crates/vcs_menu",
@@ -172,6 +173,7 @@ text = { path = "crates/text" }
 theme = { path = "crates/theme" }
 theme_importer = { path = "crates/theme_importer" }
 theme_selector = { path = "crates/theme_selector" }
+telemetry_events = { path ="crates/telemetry_events" }
 ui = { path = "crates/ui" }
 util = { path = "crates/util" }
 vcs_menu = { path = "crates/vcs_menu" }
@@ -189,12 +191,14 @@ blade-graphics = { git = "https://github.com/kvark/blade", rev = "e9d93a4d41f394
 blade-macros = { git = "https://github.com/kvark/blade", rev = "e9d93a4d41f3946a03ffb76136290d6ccf7f2b80" }
 blade-rwh = { package = "raw-window-handle", version = "0.5" }
 chrono = { version = "0.4", features = ["serde"] }
+clickhouse = { version = "0.11.6" }
 ctor = "0.2.6"
 derive_more = "0.99.17"
 env_logger = "0.9"
 futures = "0.3"
 git2 = { version = "0.15", default-features = false }
 globset = "0.4"
+hex = "0.4.3"
 indoc = "1"
 # We explicitly disable a http2 support in isahc.
 isahc = { version = "1.7.2", default-features = false, features = ["static-curl", "text-decoding"] }
@@ -219,6 +223,7 @@ serde_derive = { version = "1.0", features = ["deserialize_in_place"] }
 serde_json = { version = "1.0", features = ["preserve_order", "raw_value"] }
 serde_json_lenient = { version = "0.1", features = ["preserve_order", "raw_value"] }
 serde_repr = "0.1"
+sha2 = "0.10"
 smallvec = { version = "1.6", features = ["union"] }
 smol = "1.2"
 strum = { version = "0.25.0", features = ["derive"] }

crates/assistant/Cargo.toml 🔗

@@ -36,6 +36,7 @@ serde.workspace = true
 serde_json.workspace = true
 settings.workspace = true
 smol.workspace = true
+telemetry_events.workspace = true
 theme.workspace = true
 tiktoken-rs.workspace = true
 ui.workspace = true

crates/assistant/src/assistant_panel.rs 🔗

@@ -15,7 +15,6 @@ use ai::{
 };
 use anyhow::{anyhow, Result};
 use chrono::{DateTime, Local};
-use client::telemetry::AssistantKind;
 use collections::{hash_map, HashMap, HashSet, VecDeque};
 use editor::{
     actions::{MoveDown, MoveUp},
@@ -52,6 +51,7 @@ use std::{
     sync::Arc,
     time::{Duration, Instant},
 };
+use telemetry_events::AssistantKind;
 use theme::ThemeSettings;
 use ui::{
     prelude::*,

crates/client/Cargo.toml 🔗

@@ -41,9 +41,10 @@ schemars.workspace = true
 serde.workspace = true
 serde_derive.workspace = true
 serde_json.workspace = true
-sha2 = "0.10"
+sha2.workspace = true
 smol.workspace = true
 sysinfo.workspace = true
+telemetry_events.workspace = true
 tempfile.workspace = true
 thiserror.workspace = true
 time.workspace = true

crates/client/src/client.rs 🔗

@@ -46,7 +46,7 @@ use util::http::{HttpClient, ZedHttpClient};
 use util::{ResultExt, TryFutureExt};
 
 pub use rpc::*;
-pub use telemetry::Event;
+pub use telemetry_events::Event;
 pub use user::*;
 
 lazy_static! {

crates/client/src/telemetry.rs 🔗

@@ -8,7 +8,6 @@ use gpui::{AppContext, AppMetadata, BackgroundExecutor, Task};
 use once_cell::sync::Lazy;
 use parking_lot::Mutex;
 use release_channel::ReleaseChannel;
-use serde::Serialize;
 use settings::{Settings, SettingsStore};
 use sha2::{Digest, Sha256};
 use std::io::Write;
@@ -16,6 +15,10 @@ use std::{env, mem, path::PathBuf, sync::Arc, time::Duration};
 use sysinfo::{
     CpuRefreshKind, Pid, PidExt, ProcessExt, ProcessRefreshKind, RefreshKind, System, SystemExt,
 };
+use telemetry_events::{
+    ActionEvent, AppEvent, AssistantEvent, AssistantKind, CallEvent, CopilotEvent, CpuEvent,
+    EditEvent, EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
+};
 use tempfile::NamedTempFile;
 use util::http::{self, HttpClient, Method, ZedHttpClient};
 #[cfg(not(debug_assertions))]
@@ -35,7 +38,7 @@ struct TelemetryState {
     settings: TelemetrySettings,
     metrics_id: Option<Arc<str>>,      // Per logged-in user
     installation_id: Option<Arc<str>>, // Per app installation (different for dev, nightly, preview, and stable)
-    session_id: Option<Arc<str>>,      // Per app launch
+    session_id: Option<String>,        // Per app launch
     release_channel: Option<&'static str>,
     app_metadata: AppMetadata,
     architecture: &'static str,
@@ -48,93 +51,6 @@ struct TelemetryState {
     max_queue_size: usize,
 }
 
-#[derive(Serialize, Debug)]
-struct EventRequestBody {
-    installation_id: Option<Arc<str>>,
-    session_id: Option<Arc<str>>,
-    is_staff: Option<bool>,
-    app_version: Option<String>,
-    os_name: &'static str,
-    os_version: Option<String>,
-    architecture: &'static str,
-    release_channel: Option<&'static str>,
-    events: Vec<EventWrapper>,
-}
-
-#[derive(Serialize, Debug)]
-struct EventWrapper {
-    signed_in: bool,
-    #[serde(flatten)]
-    event: Event,
-}
-
-#[derive(Clone, Debug, PartialEq, Serialize)]
-#[serde(rename_all = "snake_case")]
-pub enum AssistantKind {
-    Panel,
-    Inline,
-}
-
-#[derive(Clone, Debug, PartialEq, Serialize)]
-#[serde(tag = "type")]
-pub enum Event {
-    Editor {
-        operation: &'static str,
-        file_extension: Option<String>,
-        vim_mode: bool,
-        copilot_enabled: bool,
-        copilot_enabled_for_language: bool,
-        milliseconds_since_first_event: i64,
-    },
-    Copilot {
-        suggestion_id: Option<String>,
-        suggestion_accepted: bool,
-        file_extension: Option<String>,
-        milliseconds_since_first_event: i64,
-    },
-    Call {
-        operation: &'static str,
-        room_id: Option<u64>,
-        channel_id: Option<u64>,
-        milliseconds_since_first_event: i64,
-    },
-    Assistant {
-        conversation_id: Option<String>,
-        kind: AssistantKind,
-        model: &'static str,
-        milliseconds_since_first_event: i64,
-    },
-    Cpu {
-        usage_as_percentage: f32,
-        core_count: u32,
-        milliseconds_since_first_event: i64,
-    },
-    Memory {
-        memory_in_bytes: u64,
-        virtual_memory_in_bytes: u64,
-        milliseconds_since_first_event: i64,
-    },
-    App {
-        operation: String,
-        milliseconds_since_first_event: i64,
-    },
-    Setting {
-        setting: &'static str,
-        value: String,
-        milliseconds_since_first_event: i64,
-    },
-    Edit {
-        duration: i64,
-        environment: &'static str,
-        milliseconds_since_first_event: i64,
-    },
-    Action {
-        source: &'static str,
-        action: String,
-        milliseconds_since_first_event: i64,
-    },
-}
-
 #[cfg(debug_assertions)]
 const MAX_QUEUE_LEN: usize = 5;
 
@@ -146,7 +62,6 @@ const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
 
 #[cfg(not(debug_assertions))]
 const FLUSH_INTERVAL: Duration = Duration::from_secs(60 * 5);
-
 static ZED_CLIENT_CHECKSUM_SEED: Lazy<Option<Vec<u8>>> = Lazy::new(|| {
     option_env!("ZED_CLIENT_CHECKSUM_SEED")
         .map(|s| s.as_bytes().into())
@@ -318,15 +233,13 @@ impl Telemetry {
         copilot_enabled: bool,
         copilot_enabled_for_language: bool,
     ) {
-        let event = Event::Editor {
+        let event = Event::Editor(EditorEvent {
             file_extension,
             vim_mode,
-            operation,
+            operation: operation.into(),
             copilot_enabled,
             copilot_enabled_for_language,
-            milliseconds_since_first_event: self
-                .milliseconds_since_first_event(self.clock.utc_now()),
-        };
+        });
 
         self.report_event(event)
     }
@@ -337,13 +250,11 @@ impl Telemetry {
         suggestion_accepted: bool,
         file_extension: Option<String>,
     ) {
-        let event = Event::Copilot {
+        let event = Event::Copilot(CopilotEvent {
             suggestion_id,
             suggestion_accepted,
             file_extension,
-            milliseconds_since_first_event: self
-                .milliseconds_since_first_event(self.clock.utc_now()),
-        };
+        });
 
         self.report_event(event)
     }
@@ -354,13 +265,11 @@ impl Telemetry {
         kind: AssistantKind,
         model: &'static str,
     ) {
-        let event = Event::Assistant {
+        let event = Event::Assistant(AssistantEvent {
             conversation_id,
             kind,
-            model,
-            milliseconds_since_first_event: self
-                .milliseconds_since_first_event(self.clock.utc_now()),
-        };
+            model: model.to_string(),
+        });
 
         self.report_event(event)
     }
@@ -371,24 +280,20 @@ impl Telemetry {
         room_id: Option<u64>,
         channel_id: Option<u64>,
     ) {
-        let event = Event::Call {
-            operation,
+        let event = Event::Call(CallEvent {
+            operation: operation.to_string(),
             room_id,
             channel_id,
-            milliseconds_since_first_event: self
-                .milliseconds_since_first_event(self.clock.utc_now()),
-        };
+        });
 
         self.report_event(event)
     }
 
     pub fn report_cpu_event(self: &Arc<Self>, usage_as_percentage: f32, core_count: u32) {
-        let event = Event::Cpu {
+        let event = Event::Cpu(CpuEvent {
             usage_as_percentage,
             core_count,
-            milliseconds_since_first_event: self
-                .milliseconds_since_first_event(self.clock.utc_now()),
-        };
+        });
 
         self.report_event(event)
     }
@@ -398,22 +303,16 @@ impl Telemetry {
         memory_in_bytes: u64,
         virtual_memory_in_bytes: u64,
     ) {
-        let event = Event::Memory {
+        let event = Event::Memory(MemoryEvent {
             memory_in_bytes,
             virtual_memory_in_bytes,
-            milliseconds_since_first_event: self
-                .milliseconds_since_first_event(self.clock.utc_now()),
-        };
+        });
 
         self.report_event(event)
     }
 
     pub fn report_app_event(self: &Arc<Self>, operation: String) -> Event {
-        let event = Event::App {
-            operation,
-            milliseconds_since_first_event: self
-                .milliseconds_since_first_event(self.clock.utc_now()),
-        };
+        let event = Event::App(AppEvent { operation });
 
         self.report_event(event.clone());
 
@@ -421,12 +320,10 @@ impl Telemetry {
     }
 
     pub fn report_setting_event(self: &Arc<Self>, setting: &'static str, value: String) {
-        let event = Event::Setting {
-            setting,
+        let event = Event::Setting(SettingEvent {
+            setting: setting.to_string(),
             value,
-            milliseconds_since_first_event: self
-                .milliseconds_since_first_event(self.clock.utc_now()),
-        };
+        });
 
         self.report_event(event)
     }
@@ -437,42 +334,24 @@ impl Telemetry {
         drop(state);
 
         if let Some((start, end, environment)) = period_data {
-            let event = Event::Edit {
+            let event = Event::Edit(EditEvent {
                 duration: end.timestamp_millis() - start.timestamp_millis(),
-                environment,
-                milliseconds_since_first_event: self
-                    .milliseconds_since_first_event(self.clock.utc_now()),
-            };
+                environment: environment.to_string(),
+            });
 
             self.report_event(event);
         }
     }
 
     pub fn report_action_event(self: &Arc<Self>, source: &'static str, action: String) {
-        let event = Event::Action {
-            source,
+        let event = Event::Action(ActionEvent {
+            source: source.to_string(),
             action,
-            milliseconds_since_first_event: self
-                .milliseconds_since_first_event(self.clock.utc_now()),
-        };
+        });
 
         self.report_event(event)
     }
 
-    fn milliseconds_since_first_event(self: &Arc<Self>, date_time: DateTime<Utc>) -> i64 {
-        let mut state = self.state.lock();
-
-        match state.first_event_date_time {
-            Some(first_event_date_time) => {
-                date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
-            }
-            None => {
-                state.first_event_date_time = Some(date_time);
-                0
-            }
-        }
-    }
-
     fn report_event(self: &Arc<Self>, event: Event) {
         let mut state = self.state.lock();
 
@@ -489,8 +368,24 @@ impl Telemetry {
             }));
         }
 
+        let date_time = self.clock.utc_now();
+
+        let milliseconds_since_first_event = match state.first_event_date_time {
+            Some(first_event_date_time) => {
+                date_time.timestamp_millis() - first_event_date_time.timestamp_millis()
+            }
+            None => {
+                state.first_event_date_time = Some(date_time);
+                0
+            }
+        };
+
         let signed_in = state.metrics_id.is_some();
-        state.events_queue.push(EventWrapper { signed_in, event });
+        state.events_queue.push(EventWrapper {
+            signed_in,
+            milliseconds_since_first_event,
+            event,
+        });
 
         if state.installation_id.is_some() {
             if state.events_queue.len() >= state.max_queue_size {
@@ -545,21 +440,22 @@ impl Telemetry {
                     {
                         let state = this.state.lock();
                         let request_body = EventRequestBody {
-                            installation_id: state.installation_id.clone(),
+                            installation_id: state.installation_id.as_deref().map(Into::into),
                             session_id: state.session_id.clone(),
                             is_staff: state.is_staff.clone(),
                             app_version: state
                                 .app_metadata
                                 .app_version
-                                .map(|version| version.to_string()),
-                            os_name: state.app_metadata.os_name,
+                                .unwrap_or_default()
+                                .to_string(),
+                            os_name: state.app_metadata.os_name.to_string(),
                             os_version: state
                                 .app_metadata
                                 .os_version
                                 .map(|version| version.to_string()),
-                            architecture: state.architecture,
+                            architecture: state.architecture.to_string(),
 
-                            release_channel: state.release_channel,
+                            release_channel: state.release_channel.map(Into::into),
                             events,
                         };
                         json_bytes.clear();
@@ -578,7 +474,7 @@ impl Telemetry {
 
                     let request = http::Request::builder()
                         .method(Method::POST)
-                        .uri(&this.http_client.zed_url("/api/events"))
+                        .uri(this.http_client.zed_api_url("/telemetry/events"))
                         .header("Content-Type", "text/plain")
                         .header("x-zed-checksum", checksum)
                         .body(json_bytes.into());
@@ -627,10 +523,9 @@ mod tests {
             let event = telemetry.report_app_event(operation.clone());
             assert_eq!(
                 event,
-                Event::App {
+                Event::App(AppEvent {
                     operation: operation.clone(),
-                    milliseconds_since_first_event: 0
-                }
+                })
             );
             assert_eq!(telemetry.state.lock().events_queue.len(), 1);
             assert!(telemetry.state.lock().flush_events_task.is_some());
@@ -644,10 +539,9 @@ mod tests {
             let event = telemetry.report_app_event(operation.clone());
             assert_eq!(
                 event,
-                Event::App {
+                Event::App(AppEvent {
                     operation: operation.clone(),
-                    milliseconds_since_first_event: 100
-                }
+                })
             );
             assert_eq!(telemetry.state.lock().events_queue.len(), 2);
             assert!(telemetry.state.lock().flush_events_task.is_some());
@@ -661,10 +555,9 @@ mod tests {
             let event = telemetry.report_app_event(operation.clone());
             assert_eq!(
                 event,
-                Event::App {
+                Event::App(AppEvent {
                     operation: operation.clone(),
-                    milliseconds_since_first_event: 200
-                }
+                })
             );
             assert_eq!(telemetry.state.lock().events_queue.len(), 3);
             assert!(telemetry.state.lock().flush_events_task.is_some());
@@ -679,10 +572,9 @@ mod tests {
             let event = telemetry.report_app_event(operation.clone());
             assert_eq!(
                 event,
-                Event::App {
+                Event::App(AppEvent {
                     operation: operation.clone(),
-                    milliseconds_since_first_event: 300
-                }
+                })
             );
 
             assert!(is_empty_state(&telemetry));
@@ -712,10 +604,9 @@ mod tests {
             let event = telemetry.report_app_event(operation.clone());
             assert_eq!(
                 event,
-                Event::App {
+                Event::App(AppEvent {
                     operation: operation.clone(),
-                    milliseconds_since_first_event: 0
-                }
+                })
             );
             assert_eq!(telemetry.state.lock().events_queue.len(), 1);
             assert!(telemetry.state.lock().flush_events_task.is_some());

crates/collab/.env.toml 🔗

@@ -12,6 +12,12 @@ BLOB_STORE_SECRET_KEY = "the-blob-store-secret-key"
 BLOB_STORE_BUCKET = "the-extensions-bucket"
 BLOB_STORE_URL = "http://127.0.0.1:9000"
 BLOB_STORE_REGION = "the-region"
+ZED_CLIENT_CHECKSUM_SEED = "development-client-checksum-seed"
+
+CLICKHOUSE_URL = "http://localhost:8123"
+CLICKHOUSE_USER = ""
+CLICKHOUSE_PASSWORD = ""
+CLICKHOUSE_DATABASE = "zed"
 
 # RUST_LOG=info
 # LOG_JSON=true

crates/collab/Cargo.toml 🔗

@@ -25,10 +25,12 @@ base64 = "0.13"
 chrono.workspace = true
 clap = { version = "3.1", features = ["derive"], optional = true }
 clock.workspace = true
+clickhouse.workspace = true
 collections.workspace = true
 dashmap = "5.4"
 envy = "0.4.2"
 futures.workspace = true
+hex.workspace = true
 hyper = "0.14"
 lazy_static.workspace = true
 lipsum = { version = "0.8", optional = true }
@@ -48,8 +50,10 @@ serde.workspace = true
 serde_derive.workspace = true
 serde_json.workspace = true
 sha-1 = "0.9"
+sha2.workspace = true
 smallvec.workspace = true
 sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "postgres", "json", "time", "uuid", "any"] }
+telemetry_events.workspace = true
 text.workspace = true
 time.workspace = true
 tokio = { version = "1", features = ["full"] }

crates/collab/src/api/events.rs 🔗

@@ -0,0 +1,805 @@
+use std::sync::Arc;
+
+use anyhow::{anyhow, Context};
+use axum::{
+    body::Bytes, headers::Header, http::HeaderName, routing::post, Extension, Router, TypedHeader,
+};
+use hyper::StatusCode;
+use lazy_static::lazy_static;
+use serde::{Serialize, Serializer};
+use sha2::{Digest, Sha256};
+use telemetry_events::{
+    ActionEvent, AppEvent, AssistantEvent, CallEvent, CopilotEvent, CpuEvent, EditEvent,
+    EditorEvent, Event, EventRequestBody, EventWrapper, MemoryEvent, SettingEvent,
+};
+
+use crate::{AppState, Error, Result};
+
+pub fn router() -> Router {
+    Router::new().route("/telemetry/events", post(post_events))
+}
+
+lazy_static! {
+    static ref ZED_CHECKSUM_HEADER: HeaderName = HeaderName::from_static("x-zed-checksum");
+    static ref CLOUDFLARE_IP_COUNTRY_HEADER: HeaderName = HeaderName::from_static("cf-ipcountry");
+}
+
+pub struct ZedChecksumHeader(Vec<u8>);
+
+impl Header for ZedChecksumHeader {
+    fn name() -> &'static HeaderName {
+        &ZED_CHECKSUM_HEADER
+    }
+
+    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
+    where
+        Self: Sized,
+        I: Iterator<Item = &'i axum::http::HeaderValue>,
+    {
+        let checksum = values
+            .next()
+            .ok_or_else(axum::headers::Error::invalid)?
+            .to_str()
+            .map_err(|_| axum::headers::Error::invalid())?;
+
+        let bytes = hex::decode(checksum).map_err(|_| axum::headers::Error::invalid())?;
+        Ok(Self(bytes))
+    }
+
+    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
+        unimplemented!()
+    }
+}
+
+pub struct CloudflareIpCountryHeader(String);
+
+impl Header for CloudflareIpCountryHeader {
+    fn name() -> &'static HeaderName {
+        &CLOUDFLARE_IP_COUNTRY_HEADER
+    }
+
+    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
+    where
+        Self: Sized,
+        I: Iterator<Item = &'i axum::http::HeaderValue>,
+    {
+        let country_code = values
+            .next()
+            .ok_or_else(axum::headers::Error::invalid)?
+            .to_str()
+            .map_err(|_| axum::headers::Error::invalid())?;
+
+        Ok(Self(country_code.to_string()))
+    }
+
+    fn encode<E: Extend<axum::http::HeaderValue>>(&self, _values: &mut E) {
+        unimplemented!()
+    }
+}
+
+pub async fn post_events(
+    Extension(app): Extension<Arc<AppState>>,
+    TypedHeader(ZedChecksumHeader(checksum)): TypedHeader<ZedChecksumHeader>,
+    country_code_header: Option<TypedHeader<CloudflareIpCountryHeader>>,
+    body: Bytes,
+) -> Result<()> {
+    let Some(clickhouse_client) = app.clickhouse_client.clone() else {
+        Err(Error::Http(
+            StatusCode::NOT_IMPLEMENTED,
+            "not supported".into(),
+        ))?
+    };
+
+    let Some(checksum_seed) = app.config.zed_client_checksum_seed.as_ref() else {
+        return Err(Error::Http(
+            StatusCode::INTERNAL_SERVER_ERROR,
+            "events not enabled".into(),
+        ))?;
+    };
+
+    let mut summer = Sha256::new();
+    summer.update(checksum_seed);
+    summer.update(&body);
+    summer.update(checksum_seed);
+
+    if &checksum[..] != &summer.finalize()[..] {
+        return Err(Error::Http(
+            StatusCode::BAD_REQUEST,
+            "invalid checksum".into(),
+        ))?;
+    }
+
+    let request_body: telemetry_events::EventRequestBody =
+        serde_json::from_slice(&body).map_err(|err| {
+            log::error!("can't parse event json: {err}");
+            Error::Internal(anyhow!(err))
+        })?;
+
+    let mut to_upload = ToUpload::default();
+    let Some(last_event) = request_body.events.last() else {
+        return Err(Error::Http(StatusCode::BAD_REQUEST, "no events".into()))?;
+    };
+    let country_code = country_code_header.map(|h| h.0 .0);
+
+    let first_event_at = chrono::Utc::now()
+        - chrono::Duration::milliseconds(last_event.milliseconds_since_first_event);
+
+    for wrapper in &request_body.events {
+        match &wrapper.event {
+            Event::Editor(event) => to_upload.editor_events.push(EditorEventRow::from_event(
+                event.clone(),
+                &wrapper,
+                &request_body,
+                first_event_at,
+                country_code.clone(),
+            )),
+            Event::Copilot(event) => to_upload.copilot_events.push(CopilotEventRow::from_event(
+                event.clone(),
+                &wrapper,
+                &request_body,
+                first_event_at,
+                country_code.clone(),
+            )),
+            Event::Call(event) => to_upload.call_events.push(CallEventRow::from_event(
+                event.clone(),
+                &wrapper,
+                &request_body,
+                first_event_at,
+            )),
+            Event::Assistant(event) => {
+                to_upload
+                    .assistant_events
+                    .push(AssistantEventRow::from_event(
+                        event.clone(),
+                        &wrapper,
+                        &request_body,
+                        first_event_at,
+                    ))
+            }
+            Event::Cpu(event) => to_upload.cpu_events.push(CpuEventRow::from_event(
+                event.clone(),
+                &wrapper,
+                &request_body,
+                first_event_at,
+            )),
+            Event::Memory(event) => to_upload.memory_events.push(MemoryEventRow::from_event(
+                event.clone(),
+                &wrapper,
+                &request_body,
+                first_event_at,
+            )),
+            Event::App(event) => to_upload.app_events.push(AppEventRow::from_event(
+                event.clone(),
+                &wrapper,
+                &request_body,
+                first_event_at,
+            )),
+            Event::Setting(event) => to_upload.setting_events.push(SettingEventRow::from_event(
+                event.clone(),
+                &wrapper,
+                &request_body,
+                first_event_at,
+            )),
+            Event::Edit(event) => to_upload.edit_events.push(EditEventRow::from_event(
+                event.clone(),
+                &wrapper,
+                &request_body,
+                first_event_at,
+            )),
+            Event::Action(event) => to_upload.action_events.push(ActionEventRow::from_event(
+                event.clone(),
+                &wrapper,
+                &request_body,
+                first_event_at,
+            )),
+        }
+    }
+
+    to_upload
+        .upload(&clickhouse_client)
+        .await
+        .map_err(|err| Error::Internal(anyhow!(err)))?;
+
+    Ok(())
+}
+
+#[derive(Default)]
+struct ToUpload {
+    editor_events: Vec<EditorEventRow>,
+    copilot_events: Vec<CopilotEventRow>,
+    assistant_events: Vec<AssistantEventRow>,
+    call_events: Vec<CallEventRow>,
+    cpu_events: Vec<CpuEventRow>,
+    memory_events: Vec<MemoryEventRow>,
+    app_events: Vec<AppEventRow>,
+    setting_events: Vec<SettingEventRow>,
+    edit_events: Vec<EditEventRow>,
+    action_events: Vec<ActionEventRow>,
+}
+
+impl ToUpload {
+    pub async fn upload(&self, clickhouse_client: &clickhouse::Client) -> anyhow::Result<()> {
+        Self::upload_to_table("editor_events", &self.editor_events, clickhouse_client)
+            .await
+            .with_context(|| format!("failed to upload to table 'editor_events'"))?;
+        Self::upload_to_table("copilot_events", &self.copilot_events, clickhouse_client)
+            .await
+            .with_context(|| format!("failed to upload to table 'copilot_events'"))?;
+        Self::upload_to_table(
+            "assistant_events",
+            &self.assistant_events,
+            clickhouse_client,
+        )
+        .await
+        .with_context(|| format!("failed to upload to table 'assistant_events'"))?;
+        Self::upload_to_table("call_events", &self.call_events, clickhouse_client)
+            .await
+            .with_context(|| format!("failed to upload to table 'call_events'"))?;
+        Self::upload_to_table("cpu_events", &self.cpu_events, clickhouse_client)
+            .await
+            .with_context(|| format!("failed to upload to table 'cpu_events'"))?;
+        Self::upload_to_table("memory_events", &self.memory_events, clickhouse_client)
+            .await
+            .with_context(|| format!("failed to upload to table 'memory_events'"))?;
+        Self::upload_to_table("app_events", &self.app_events, clickhouse_client)
+            .await
+            .with_context(|| format!("failed to upload to table 'app_events'"))?;
+        Self::upload_to_table("setting_events", &self.setting_events, clickhouse_client)
+            .await
+            .with_context(|| format!("failed to upload to table 'setting_events'"))?;
+        Self::upload_to_table("edit_events", &self.edit_events, clickhouse_client)
+            .await
+            .with_context(|| format!("failed to upload to table 'edit_events'"))?;
+        Self::upload_to_table("action_events", &self.action_events, clickhouse_client)
+            .await
+            .with_context(|| format!("failed to upload to table 'action_events'"))?;
+        Ok(())
+    }
+
+    async fn upload_to_table<T: clickhouse::Row + Serialize + std::fmt::Debug>(
+        table: &str,
+        rows: &[T],
+        clickhouse_client: &clickhouse::Client,
+    ) -> anyhow::Result<()> {
+        if !rows.is_empty() {
+            let mut insert = clickhouse_client.insert(table)?;
+
+            for event in rows {
+                insert.write(event).await?;
+            }
+
+            insert.end().await?;
+        }
+
+        Ok(())
+    }
+}
+
+pub fn serialize_country_code<S>(country_code: &str, serializer: S) -> Result<S::Ok, S::Error>
+where
+    S: Serializer,
+{
+    if country_code.len() != 2 {
+        use serde::ser::Error;
+        return Err(S::Error::custom(
+            "country_code must be exactly 2 characters",
+        ));
+    }
+
+    let country_code = country_code.as_bytes();
+
+    serializer.serialize_u16(((country_code[0] as u16) << 8) + country_code[1] as u16)
+}
+
+#[derive(Serialize, Debug, clickhouse::Row)]
+pub struct EditorEventRow {
+    pub installation_id: String,
+    pub operation: String,
+    pub app_version: String,
+    pub file_extension: String,
+    pub os_name: String,
+    pub os_version: String,
+    pub release_channel: String,
+    pub signed_in: bool,
+    pub vim_mode: bool,
+    #[serde(serialize_with = "serialize_country_code")]
+    pub country_code: String,
+    pub region_code: String,
+    pub city: String,
+    pub time: i64,
+    pub copilot_enabled: bool,
+    pub copilot_enabled_for_language: bool,
+    pub historical_event: bool,
+    pub architecture: String,
+    pub is_staff: Option<bool>,
+    pub session_id: Option<String>,
+    pub major: Option<i32>,
+    pub minor: Option<i32>,
+    pub patch: Option<i32>,
+}
+
+impl EditorEventRow {
+    fn from_event(
+        event: EditorEvent,
+        wrapper: &EventWrapper,
+        body: &EventRequestBody,
+        first_event_at: chrono::DateTime<chrono::Utc>,
+        country_code: Option<String>,
+    ) -> Self {
+        let semver = body.semver();
+        let time =
+            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
+
+        Self {
+            app_version: body.app_version.clone(),
+            major: semver.map(|s| s.major as i32),
+            minor: semver.map(|s| s.minor as i32),
+            patch: semver.map(|s| s.patch as i32),
+            release_channel: body.release_channel.clone().unwrap_or_default(),
+            os_name: body.os_name.clone(),
+            os_version: body.os_version.clone().unwrap_or_default(),
+            architecture: body.architecture.clone(),
+            installation_id: body.installation_id.clone().unwrap_or_default(),
+            session_id: body.session_id.clone(),
+            is_staff: body.is_staff,
+            time: time.timestamp_millis(),
+            operation: event.operation,
+            file_extension: event.file_extension.unwrap_or_default(),
+            signed_in: wrapper.signed_in,
+            vim_mode: event.vim_mode,
+            copilot_enabled: event.copilot_enabled,
+            copilot_enabled_for_language: event.copilot_enabled_for_language,
+            country_code: country_code.unwrap_or("XX".to_string()),
+            region_code: "".to_string(),
+            city: "".to_string(),
+            historical_event: false,
+        }
+    }
+}
+
+#[derive(Serialize, Debug, clickhouse::Row)]
+pub struct CopilotEventRow {
+    pub installation_id: String,
+    pub suggestion_id: String,
+    pub suggestion_accepted: bool,
+    pub app_version: String,
+    pub file_extension: String,
+    pub os_name: String,
+    pub os_version: String,
+    pub release_channel: String,
+    pub signed_in: bool,
+    #[serde(serialize_with = "serialize_country_code")]
+    pub country_code: String,
+    pub region_code: String,
+    pub city: String,
+    pub time: i64,
+    pub is_staff: Option<bool>,
+    pub session_id: Option<String>,
+    pub major: Option<i32>,
+    pub minor: Option<i32>,
+    pub patch: Option<i32>,
+}
+
+impl CopilotEventRow {
+    fn from_event(
+        event: CopilotEvent,
+        wrapper: &EventWrapper,
+        body: &EventRequestBody,
+        first_event_at: chrono::DateTime<chrono::Utc>,
+        country_code: Option<String>,
+    ) -> Self {
+        let semver = body.semver();
+        let time =
+            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
+
+        Self {
+            app_version: body.app_version.clone(),
+            major: semver.map(|s| s.major as i32),
+            minor: semver.map(|s| s.minor as i32),
+            patch: semver.map(|s| s.patch as i32),
+            release_channel: body.release_channel.clone().unwrap_or_default(),
+            os_name: body.os_name.clone(),
+            os_version: body.os_version.clone().unwrap_or_default(),
+            installation_id: body.installation_id.clone().unwrap_or_default(),
+            session_id: body.session_id.clone(),
+            is_staff: body.is_staff,
+            time: time.timestamp_millis(),
+            file_extension: event.file_extension.unwrap_or_default(),
+            signed_in: wrapper.signed_in,
+            country_code: country_code.unwrap_or("XX".to_string()),
+            region_code: "".to_string(),
+            city: "".to_string(),
+            suggestion_id: event.suggestion_id.unwrap_or_default(),
+            suggestion_accepted: event.suggestion_accepted,
+        }
+    }
+}
+
+#[derive(Serialize, Debug, clickhouse::Row)]
+pub struct CallEventRow {
+    // AppInfoBase
+    app_version: String,
+    major: Option<i32>,
+    minor: Option<i32>,
+    patch: Option<i32>,
+    release_channel: String,
+
+    // ClientEventBase
+    installation_id: Option<String>,
+    session_id: Option<String>,
+    is_staff: Option<bool>,
+    time: i64,
+
+    // CallEventRow
+    operation: String,
+    room_id: Option<u64>,
+    channel_id: Option<u64>,
+}
+
+impl CallEventRow {
+    fn from_event(
+        event: CallEvent,
+        wrapper: &EventWrapper,
+        body: &EventRequestBody,
+        first_event_at: chrono::DateTime<chrono::Utc>,
+    ) -> Self {
+        let semver = body.semver();
+        let time =
+            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
+
+        Self {
+            app_version: body.app_version.clone(),
+            major: semver.map(|s| s.major as i32),
+            minor: semver.map(|s| s.minor as i32),
+            patch: semver.map(|s| s.patch as i32),
+            release_channel: body.release_channel.clone().unwrap_or_default(),
+            installation_id: body.installation_id.clone(),
+            session_id: body.session_id.clone(),
+            is_staff: body.is_staff,
+            time: time.timestamp_millis(),
+            operation: event.operation,
+            room_id: event.room_id,
+            channel_id: event.channel_id,
+        }
+    }
+}
+
+#[derive(Serialize, Debug, clickhouse::Row)]
+pub struct AssistantEventRow {
+    // AppInfoBase
+    app_version: String,
+    major: Option<i32>,
+    minor: Option<i32>,
+    patch: Option<i32>,
+    release_channel: String,
+
+    // ClientEventBase
+    installation_id: Option<String>,
+    session_id: Option<String>,
+    is_staff: Option<bool>,
+    time: i64,
+
+    // AssistantEventRow
+    conversation_id: Option<String>,
+    kind: String,
+    model: String,
+}
+
+impl AssistantEventRow {
+    fn from_event(
+        event: AssistantEvent,
+        wrapper: &EventWrapper,
+        body: &EventRequestBody,
+        first_event_at: chrono::DateTime<chrono::Utc>,
+    ) -> Self {
+        let semver = body.semver();
+        let time =
+            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
+
+        Self {
+            app_version: body.app_version.clone(),
+            major: semver.map(|s| s.major as i32),
+            minor: semver.map(|s| s.minor as i32),
+            patch: semver.map(|s| s.patch as i32),
+            release_channel: body.release_channel.clone().unwrap_or_default(),
+            installation_id: body.installation_id.clone(),
+            session_id: body.session_id.clone(),
+            is_staff: body.is_staff,
+            time: time.timestamp_millis(),
+            conversation_id: event.conversation_id,
+            kind: event.kind.to_string(),
+            model: event.model,
+        }
+    }
+}
+
+#[derive(Debug, clickhouse::Row, Serialize)]
+pub struct CpuEventRow {
+    pub installation_id: Option<String>,
+    pub is_staff: Option<bool>,
+    pub usage_as_percentage: f32,
+    pub core_count: u32,
+    pub app_version: String,
+    pub release_channel: String,
+    pub time: i64,
+    pub session_id: Option<String>,
+    // pub normalized_cpu_usage: f64, MATERIALIZED
+    pub major: Option<i32>,
+    pub minor: Option<i32>,
+    pub patch: Option<i32>,
+}
+
+impl CpuEventRow {
+    fn from_event(
+        event: CpuEvent,
+        wrapper: &EventWrapper,
+        body: &EventRequestBody,
+        first_event_at: chrono::DateTime<chrono::Utc>,
+    ) -> Self {
+        let semver = body.semver();
+        let time =
+            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
+
+        Self {
+            app_version: body.app_version.clone(),
+            major: semver.map(|s| s.major as i32),
+            minor: semver.map(|s| s.minor as i32),
+            patch: semver.map(|s| s.patch as i32),
+            release_channel: body.release_channel.clone().unwrap_or_default(),
+            installation_id: body.installation_id.clone(),
+            session_id: body.session_id.clone(),
+            is_staff: body.is_staff,
+            time: time.timestamp_millis(),
+            usage_as_percentage: event.usage_as_percentage,
+            core_count: event.core_count,
+        }
+    }
+}
+
+#[derive(Serialize, Debug, clickhouse::Row)]
+pub struct MemoryEventRow {
+    // AppInfoBase
+    app_version: String,
+    major: Option<i32>,
+    minor: Option<i32>,
+    patch: Option<i32>,
+    release_channel: String,
+
+    // ClientEventBase
+    installation_id: Option<String>,
+    session_id: Option<String>,
+    is_staff: Option<bool>,
+    time: i64,
+
+    // MemoryEventRow
+    memory_in_bytes: u64,
+    virtual_memory_in_bytes: u64,
+}
+
+impl MemoryEventRow {
+    fn from_event(
+        event: MemoryEvent,
+        wrapper: &EventWrapper,
+        body: &EventRequestBody,
+        first_event_at: chrono::DateTime<chrono::Utc>,
+    ) -> Self {
+        let semver = body.semver();
+        let time =
+            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
+
+        Self {
+            app_version: body.app_version.clone(),
+            major: semver.map(|s| s.major as i32),
+            minor: semver.map(|s| s.minor as i32),
+            patch: semver.map(|s| s.patch as i32),
+            release_channel: body.release_channel.clone().unwrap_or_default(),
+            installation_id: body.installation_id.clone(),
+            session_id: body.session_id.clone(),
+            is_staff: body.is_staff,
+            time: time.timestamp_millis(),
+            memory_in_bytes: event.memory_in_bytes,
+            virtual_memory_in_bytes: event.virtual_memory_in_bytes,
+        }
+    }
+}
+
+#[derive(Serialize, Debug, clickhouse::Row)]
+pub struct AppEventRow {
+    // AppInfoBase
+    app_version: String,
+    major: Option<i32>,
+    minor: Option<i32>,
+    patch: Option<i32>,
+    release_channel: String,
+
+    // ClientEventBase
+    installation_id: Option<String>,
+    session_id: Option<String>,
+    is_staff: Option<bool>,
+    time: i64,
+
+    // AppEventRow
+    operation: String,
+}
+
+impl AppEventRow {
+    fn from_event(
+        event: AppEvent,
+        wrapper: &EventWrapper,
+        body: &EventRequestBody,
+        first_event_at: chrono::DateTime<chrono::Utc>,
+    ) -> Self {
+        let semver = body.semver();
+        let time =
+            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
+
+        Self {
+            app_version: body.app_version.clone(),
+            major: semver.map(|s| s.major as i32),
+            minor: semver.map(|s| s.minor as i32),
+            patch: semver.map(|s| s.patch as i32),
+            release_channel: body.release_channel.clone().unwrap_or_default(),
+            installation_id: body.installation_id.clone(),
+            session_id: body.session_id.clone(),
+            is_staff: body.is_staff,
+            time: time.timestamp_millis(),
+            operation: event.operation,
+        }
+    }
+}
+
+#[derive(Serialize, Debug, clickhouse::Row)]
+pub struct SettingEventRow {
+    // AppInfoBase
+    app_version: String,
+    major: Option<i32>,
+    minor: Option<i32>,
+    patch: Option<i32>,
+    release_channel: String,
+
+    // ClientEventBase
+    installation_id: Option<String>,
+    session_id: Option<String>,
+    is_staff: Option<bool>,
+    time: i64,
+    // SettingEventRow
+    setting: String,
+    value: String,
+}
+
+impl SettingEventRow {
+    fn from_event(
+        event: SettingEvent,
+        wrapper: &EventWrapper,
+        body: &EventRequestBody,
+        first_event_at: chrono::DateTime<chrono::Utc>,
+    ) -> Self {
+        let semver = body.semver();
+        let time =
+            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
+
+        Self {
+            app_version: body.app_version.clone(),
+            major: semver.map(|s| s.major as i32),
+            minor: semver.map(|s| s.minor as i32),
+            patch: semver.map(|s| s.patch as i32),
+            release_channel: body.release_channel.clone().unwrap_or_default(),
+            installation_id: body.installation_id.clone(),
+            session_id: body.session_id.clone(),
+            is_staff: body.is_staff,
+            time: time.timestamp_millis(),
+            setting: event.setting,
+            value: event.value,
+        }
+    }
+}
+
+#[derive(Serialize, Debug, clickhouse::Row)]
+pub struct EditEventRow {
+    // AppInfoBase
+    app_version: String,
+    major: Option<i32>,
+    minor: Option<i32>,
+    patch: Option<i32>,
+    release_channel: String,
+
+    // SystemInfoBase
+    os_name: String,
+    os_version: Option<String>,
+    architecture: String,
+
+    // ClientEventBase
+    installation_id: Option<String>,
+    // Note: This column name has a typo in the ClickHouse table.
+    #[serde(rename = "sesssion_id")]
+    session_id: Option<String>,
+    is_staff: Option<bool>,
+    time: i64,
+
+    // EditEventRow
+    period_start: i64,
+    period_end: i64,
+    environment: String,
+}
+
+impl EditEventRow {
+    fn from_event(
+        event: EditEvent,
+        wrapper: &EventWrapper,
+        body: &EventRequestBody,
+        first_event_at: chrono::DateTime<chrono::Utc>,
+    ) -> Self {
+        let semver = body.semver();
+        let time =
+            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
+
+        let period_start = time - chrono::Duration::milliseconds(event.duration);
+        let period_end = time;
+
+        Self {
+            app_version: body.app_version.clone(),
+            major: semver.map(|s| s.major as i32),
+            minor: semver.map(|s| s.minor as i32),
+            patch: semver.map(|s| s.patch as i32),
+            release_channel: body.release_channel.clone().unwrap_or_default(),
+            os_name: body.os_name.clone(),
+            os_version: body.os_version.clone(),
+            architecture: body.architecture.clone(),
+            installation_id: body.installation_id.clone(),
+            session_id: body.session_id.clone(),
+            is_staff: body.is_staff,
+            time: time.timestamp_millis(),
+            period_start: period_start.timestamp_millis(),
+            period_end: period_end.timestamp_millis(),
+            environment: event.environment,
+        }
+    }
+}
+
+#[derive(Serialize, Debug, clickhouse::Row)]
+pub struct ActionEventRow {
+    // AppInfoBase
+    app_version: String,
+    major: Option<i32>,
+    minor: Option<i32>,
+    patch: Option<i32>,
+    release_channel: String,
+
+    // ClientEventBase
+    installation_id: Option<String>,
+    // Note: This column name has a typo in the ClickHouse table.
+    #[serde(rename = "sesssion_id")]
+    session_id: Option<String>,
+    is_staff: Option<bool>,
+    time: i64,
+    // ActionEventRow
+    source: String,
+    action: String,
+}
+
+impl ActionEventRow {
+    fn from_event(
+        event: ActionEvent,
+        wrapper: &EventWrapper,
+        body: &EventRequestBody,
+        first_event_at: chrono::DateTime<chrono::Utc>,
+    ) -> Self {
+        let semver = body.semver();
+        let time =
+            first_event_at + chrono::Duration::milliseconds(wrapper.milliseconds_since_first_event);
+
+        Self {
+            app_version: body.app_version.clone(),
+            major: semver.map(|s| s.major as i32),
+            minor: semver.map(|s| s.minor as i32),
+            patch: semver.map(|s| s.patch as i32),
+            release_channel: body.release_channel.clone().unwrap_or_default(),
+            installation_id: body.installation_id.clone(),
+            session_id: body.session_id.clone(),
+            is_staff: body.is_staff,
+            time: time.timestamp_millis(),
+            source: event.source,
+            action: event.action,
+        }
+    }
+}

crates/collab/src/lib.rs 🔗

@@ -58,11 +58,24 @@ impl From<serde_json::Error> for Error {
 impl IntoResponse for Error {
     fn into_response(self) -> axum::response::Response {
         match self {
-            Error::Http(code, message) => (code, message).into_response(),
+            Error::Http(code, message) => {
+                log::error!("HTTP error {}: {}", code, &message);
+                (code, message).into_response()
+            }
             Error::Database(error) => {
+                log::error!(
+                    "HTTP error {}: {}",
+                    StatusCode::INTERNAL_SERVER_ERROR,
+                    &error
+                );
                 (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", &error)).into_response()
             }
             Error::Internal(error) => {
+                log::error!(
+                    "HTTP error {}: {}",
+                    StatusCode::INTERNAL_SERVER_ERROR,
+                    &error
+                );
                 (StatusCode::INTERNAL_SERVER_ERROR, format!("{}", &error)).into_response()
             }
         }
@@ -97,6 +110,10 @@ pub struct Config {
     pub database_url: String,
     pub database_max_connections: u32,
     pub api_token: String,
+    pub clickhouse_url: Option<String>,
+    pub clickhouse_user: Option<String>,
+    pub clickhouse_password: Option<String>,
+    pub clickhouse_database: Option<String>,
     pub invite_link_prefix: String,
     pub live_kit_server: Option<String>,
     pub live_kit_key: Option<String>,
@@ -109,6 +126,7 @@ pub struct Config {
     pub blob_store_secret_key: Option<String>,
     pub blob_store_bucket: Option<String>,
     pub zed_environment: Arc<str>,
+    pub zed_client_checksum_seed: Option<String>,
 }
 
 impl Config {
@@ -127,6 +145,7 @@ pub struct AppState {
     pub db: Arc<Database>,
     pub live_kit_client: Option<Arc<dyn live_kit_server::api::Client>>,
     pub blob_store_client: Option<aws_sdk_s3::Client>,
+    pub clickhouse_client: Option<clickhouse::Client>,
     pub config: Config,
 }
 
@@ -156,6 +175,7 @@ impl AppState {
             db: Arc::new(db),
             live_kit_client,
             blob_store_client: build_blob_store_client(&config).await.log_err(),
+            clickhouse_client: build_clickhouse_client(&config).log_err(),
             config,
         };
         Ok(Arc::new(this))
@@ -196,3 +216,31 @@ async fn build_blob_store_client(config: &Config) -> anyhow::Result<aws_sdk_s3::
 
     Ok(aws_sdk_s3::Client::new(&s3_config))
 }
+
+fn build_clickhouse_client(config: &Config) -> anyhow::Result<clickhouse::Client> {
+    Ok(clickhouse::Client::default()
+        .with_url(
+            config
+                .clickhouse_url
+                .as_ref()
+                .ok_or_else(|| anyhow!("missing clickhouse_url"))?,
+        )
+        .with_user(
+            config
+                .clickhouse_user
+                .as_ref()
+                .ok_or_else(|| anyhow!("missing clickhouse_user"))?,
+        )
+        .with_password(
+            config
+                .clickhouse_password
+                .as_ref()
+                .ok_or_else(|| anyhow!("missing clickhouse_password"))?,
+        )
+        .with_database(
+            config
+                .clickhouse_database
+                .as_ref()
+                .ok_or_else(|| anyhow!("missing clickhouse_database"))?,
+        ))
+}

crates/collab/src/main.rs 🔗

@@ -61,6 +61,7 @@ async fn main() -> Result<()> {
                     Router::new()
                         .route("/", get(handle_root))
                         .route("/healthz", get(handle_liveness_probe))
+                        .merge(collab::api::events::router())
                         .layer(Extension(state.clone())),
                 );
 

crates/collab/src/tests/test_server.rs 🔗

@@ -482,6 +482,7 @@ impl TestServer {
             db: test_db.db().clone(),
             live_kit_client: Some(Arc::new(fake_server.create_api_client())),
             blob_store_client: None,
+            clickhouse_client: None,
             config: Config {
                 http_port: 0,
                 database_url: "".into(),
@@ -499,6 +500,11 @@ impl TestServer {
                 blob_store_access_key: None,
                 blob_store_secret_key: None,
                 blob_store_bucket: None,
+                clickhouse_url: None,
+                clickhouse_user: None,
+                clickhouse_password: None,
+                clickhouse_database: None,
+                zed_client_checksum_seed: None,
             },
         })
     }

crates/telemetry_events/Cargo.toml 🔗

@@ -0,0 +1,13 @@
+[package]
+name = "telemetry_events"
+version = "0.1.0"
+edition = "2021"
+publish = false
+license = "GPL-3.0-or-later"
+
+[lib]
+path = "src/telemetry_events.rs"
+
+[dependencies]
+serde.workspace = true
+util.workspace = true

crates/telemetry_events/src/telemetry_events.rs 🔗

@@ -0,0 +1,131 @@
+use std::fmt::Display;
+
+use serde::{Deserialize, Serialize};
+use util::SemanticVersion;
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct EventRequestBody {
+    pub installation_id: Option<String>,
+    pub session_id: Option<String>,
+    pub is_staff: Option<bool>,
+    pub app_version: String,
+    pub os_name: String,
+    pub os_version: Option<String>,
+    pub architecture: String,
+    pub release_channel: Option<String>,
+    pub events: Vec<EventWrapper>,
+}
+
+impl EventRequestBody {
+    pub fn semver(&self) -> Option<SemanticVersion> {
+        self.app_version.parse().ok()
+    }
+}
+
+#[derive(Serialize, Deserialize, Debug)]
+pub struct EventWrapper {
+    pub signed_in: bool,
+    pub milliseconds_since_first_event: i64,
+    #[serde(flatten)]
+    pub event: Event,
+}
+
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum AssistantKind {
+    Panel,
+    Inline,
+}
+
+impl Display for AssistantKind {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "{}",
+            match self {
+                Self::Panel => "panel",
+                Self::Inline => "inline",
+            }
+        )
+    }
+}
+
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+#[serde(tag = "type")]
+pub enum Event {
+    Editor(EditorEvent),
+    Copilot(CopilotEvent),
+    Call(CallEvent),
+    Assistant(AssistantEvent),
+    Cpu(CpuEvent),
+    Memory(MemoryEvent),
+    App(AppEvent),
+    Setting(SettingEvent),
+    Edit(EditEvent),
+    Action(ActionEvent),
+}
+
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub struct EditorEvent {
+    pub operation: String,
+    pub file_extension: Option<String>,
+    pub vim_mode: bool,
+    pub copilot_enabled: bool,
+    pub copilot_enabled_for_language: bool,
+}
+
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub struct CopilotEvent {
+    pub suggestion_id: Option<String>,
+    pub suggestion_accepted: bool,
+    pub file_extension: Option<String>,
+}
+
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub struct CallEvent {
+    pub operation: String,
+    pub room_id: Option<u64>,
+    pub channel_id: Option<u64>,
+}
+
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub struct AssistantEvent {
+    pub conversation_id: Option<String>,
+    pub kind: AssistantKind,
+    pub model: String,
+}
+
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub struct CpuEvent {
+    pub usage_as_percentage: f32,
+    pub core_count: u32,
+}
+
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub struct MemoryEvent {
+    pub memory_in_bytes: u64,
+    pub virtual_memory_in_bytes: u64,
+}
+
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub struct ActionEvent {
+    pub source: String,
+    pub action: String,
+}
+
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub struct EditEvent {
+    pub duration: i64,
+    pub environment: String,
+}
+
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub struct SettingEvent {
+    pub setting: String,
+    pub value: String,
+}
+
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
+pub struct AppEvent {
+    pub operation: String,
+}

crates/util/src/http.rs 🔗

@@ -23,6 +23,19 @@ impl ZedHttpClient {
     pub fn zed_url(&self, path: &str) -> String {
         format!("{}{}", self.zed_host.lock(), path)
     }
+
+    pub fn zed_api_url(&self, path: &str) -> String {
+        let zed_host = self.zed_host.lock().clone();
+
+        let host = match zed_host.as_ref() {
+            "https://zed.dev" => "https://api.zed.dev",
+            "https://staging.zed.dev" => "https://api-staging.zed.dev",
+            "http://localhost:3000" => "http://localhost:8080",
+            other => other,
+        };
+
+        format!("{}{}", host, path)
+    }
 }
 
 impl HttpClient for Arc<ZedHttpClient> {

typos.toml 🔗

@@ -22,5 +22,8 @@ extend-ignore-re = [
     ":ba\\|z",
     # :/ crates/collab/migrations/20231009181554_add_release_channel_to_rooms.sql
     "COLUMN enviroment",
+    # Typo in ClickHouse column name.
+    # crates/collab/src/api/events.rs
+    "rename = \"sesssion_id\""
 ]
 check-filename = true