Rework telemetry code to support sending events to Clickhouse

Joseph Lyons and Max Brunsfeld created

Co-Authored-By: Max Brunsfeld <maxbrunsfeld@gmail.com>

Change summary

Cargo.lock                             |   1 
crates/client/src/client.rs            |  35 +----
crates/client/src/telemetry.rs         | 174 ++++++++++++++++++++++++---
crates/editor/Cargo.toml               |   1 
crates/editor/src/editor.rs            |  28 +++
crates/editor/src/items.rs             |   2 
crates/feedback/src/feedback_editor.rs |   5 
crates/zed/src/main.rs                 |   4 
crates/zed/src/zed.rs                  |   2 
9 files changed, 192 insertions(+), 60 deletions(-)

Detailed changes

Cargo.lock 🔗

@@ -1972,6 +1972,7 @@ version = "0.1.0"
 dependencies = [
  "aho-corasick",
  "anyhow",
+ "client",
  "clock",
  "collections",
  "context_menu",

crates/client/src/client.rs 🔗

@@ -17,7 +17,7 @@ use futures::{
 use gpui::{
     actions,
     platform::AppVersion,
-    serde_json::{self, Value},
+    serde_json::{self},
     AnyModelHandle, AnyWeakModelHandle, AnyWeakViewHandle, AppContext, AsyncAppContext, Entity,
     ModelHandle, Task, View, ViewContext, WeakViewHandle,
 };
@@ -27,7 +27,7 @@ use postage::watch;
 use rand::prelude::*;
 use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, PeerId, RequestMessage};
 use serde::Deserialize;
-use settings::{Settings, TelemetrySettings};
+use settings::Settings;
 use std::{
     any::TypeId,
     collections::HashMap,
@@ -47,6 +47,7 @@ use util::http::HttpClient;
 use util::{ResultExt, TryFutureExt};
 
 pub use rpc::*;
+pub use telemetry::ClickhouseEvent;
 pub use user::*;
 
 lazy_static! {
@@ -736,7 +737,7 @@ impl Client {
             read_from_keychain = credentials.is_some();
             if read_from_keychain {
                 cx.read(|cx| {
-                    self.report_event(
+                    self.telemetry().report_mixpanel_event(
                         "read credentials from keychain",
                         Default::default(),
                         cx.global::<Settings>().telemetry(),
@@ -1116,7 +1117,7 @@ impl Client {
                 .context("failed to decrypt access token")?;
             platform.activate(true);
 
-            telemetry.report_event(
+            telemetry.report_mixpanel_event(
                 "authenticate with browser",
                 Default::default(),
                 metrics_enabled,
@@ -1338,30 +1339,8 @@ impl Client {
         }
     }
 
-    pub fn start_telemetry(&self) {
-        self.telemetry.start();
-    }
-
-    pub fn report_event(
-        &self,
-        kind: &str,
-        properties: Value,
-        telemetry_settings: TelemetrySettings,
-    ) {
-        self.telemetry
-            .report_event(kind, properties.clone(), telemetry_settings);
-    }
-
-    pub fn telemetry_log_file_path(&self) -> Option<PathBuf> {
-        self.telemetry.log_file_path()
-    }
-
-    pub fn metrics_id(&self) -> Option<Arc<str>> {
-        self.telemetry.metrics_id()
-    }
-
-    pub fn is_staff(&self) -> Option<bool> {
-        self.telemetry.is_staff()
+    pub fn telemetry(&self) -> &Arc<Telemetry> {
+        &self.telemetry
     }
 }
 

crates/client/src/telemetry.rs 🔗

@@ -1,3 +1,4 @@
+use crate::{ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL};
 use db::kvp::KEY_VALUE_STORE;
 use gpui::{
     executor::Background,
@@ -35,20 +36,56 @@ struct TelemetryState {
     release_channel: Option<&'static str>,
     os_version: Option<Arc<str>>,
     os_name: &'static str,
-    queue: Vec<MixpanelEvent>,
-    next_event_id: usize,
-    flush_task: Option<Task<()>>,
+    mixpanel_events_queue: Vec<MixpanelEvent>, // Mixpanel mixed events - will hopefully die soon
+    clickhouse_events_queue: Vec<ClickhouseEventWrapper>,
+    next_mixpanel_event_id: usize,
+    flush_mixpanel_events_task: Option<Task<()>>,
+    flush_clickhouse_events_task: Option<Task<()>>,
     log_file: Option<NamedTempFile>,
     is_staff: Option<bool>,
 }
 
 const MIXPANEL_EVENTS_URL: &'static str = "https://api.mixpanel.com/track";
 const MIXPANEL_ENGAGE_URL: &'static str = "https://api.mixpanel.com/engage#profile-set";
+const CLICKHOUSE_EVENTS_URL_PATH: &'static str = "/api/events";
 
 lazy_static! {
     static ref MIXPANEL_TOKEN: Option<String> = std::env::var("ZED_MIXPANEL_TOKEN")
         .ok()
         .or_else(|| option_env!("ZED_MIXPANEL_TOKEN").map(|key| key.to_string()));
+    static ref CLICKHOUSE_EVENTS_URL: String =
+        format!("{}{}", *ZED_SERVER_URL, CLICKHOUSE_EVENTS_URL_PATH);
+}
+
+#[derive(Serialize, Debug)]
+struct ClickhouseEventRequestBody {
+    token: &'static str,
+    installation_id: Option<Arc<str>>,
+    app_version: Option<Arc<str>>,
+    os_name: &'static str,
+    os_version: Option<Arc<str>>,
+    release_channel: Option<&'static str>,
+    events: Vec<ClickhouseEventWrapper>,
+}
+
+#[derive(Serialize, Debug)]
+struct ClickhouseEventWrapper {
+    time: u128,
+    signed_in: bool,
+    #[serde(flatten)]
+    event: ClickhouseEvent,
+}
+
+#[derive(Serialize, Debug)]
+#[serde(tag = "type")]
+pub enum ClickhouseEvent {
+    Editor {
+        operation: &'static str,
+        file_extension: Option<String>,
+        vim_mode: bool,
+        copilot_enabled: bool,
+        copilot_enabled_for_language: bool,
+    },
 }
 
 #[derive(Serialize, Debug)]
@@ -121,9 +158,11 @@ impl Telemetry {
                 release_channel,
                 device_id: None,
                 metrics_id: None,
-                queue: Default::default(),
-                flush_task: Default::default(),
-                next_event_id: 0,
+                mixpanel_events_queue: Default::default(),
+                clickhouse_events_queue: Default::default(),
+                flush_mixpanel_events_task: Default::default(),
+                flush_clickhouse_events_task: Default::default(),
+                next_mixpanel_event_id: 0,
                 log_file: None,
                 is_staff: None,
             }),
@@ -168,15 +207,24 @@ impl Telemetry {
                     let device_id: Arc<str> = device_id.into();
                     let mut state = this.state.lock();
                     state.device_id = Some(device_id.clone());
-                    for event in &mut state.queue {
+
+                    for event in &mut state.mixpanel_events_queue {
                         event
                             .properties
                             .distinct_id
                             .get_or_insert_with(|| device_id.clone());
                     }
-                    if !state.queue.is_empty() {
-                        drop(state);
-                        this.flush();
+
+                    let has_mixpanel_events = !state.mixpanel_events_queue.is_empty();
+                    let has_clickhouse_events = !state.clickhouse_events_queue.is_empty();
+                    drop(state);
+
+                    if has_mixpanel_events {
+                        this.flush_mixpanel_events();
+                    }
+
+                    if has_clickhouse_events {
+                        this.flush_clickhouse_events();
                     }
 
                     anyhow::Ok(())
@@ -231,7 +279,42 @@ impl Telemetry {
         }
     }
 
-    pub fn report_event(
+    pub fn report_clickhouse_event(
+        self: &Arc<Self>,
+        event: ClickhouseEvent,
+        telemetry_settings: TelemetrySettings,
+    ) {
+        if !telemetry_settings.metrics() {
+            return;
+        }
+
+        let mut state = self.state.lock();
+        let signed_in = state.metrics_id.is_some();
+        state.clickhouse_events_queue.push(ClickhouseEventWrapper {
+            time: SystemTime::now()
+                .duration_since(UNIX_EPOCH)
+                .unwrap()
+                .as_millis(),
+            signed_in,
+            event,
+        });
+
+        if state.device_id.is_some() {
+            if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
+                drop(state);
+                self.flush_clickhouse_events();
+            } else {
+                let this = self.clone();
+                let executor = self.executor.clone();
+                state.flush_clickhouse_events_task = Some(self.executor.spawn(async move {
+                    executor.timer(DEBOUNCE_INTERVAL).await;
+                    this.flush_clickhouse_events();
+                }));
+            }
+        }
+    }
+
+    pub fn report_mixpanel_event(
         self: &Arc<Self>,
         kind: &str,
         properties: Value,
@@ -243,7 +326,7 @@ impl Telemetry {
 
         let mut state = self.state.lock();
         let event = MixpanelEvent {
-            event: kind.to_string(),
+            event: kind.into(),
             properties: MixpanelEventProperties {
                 token: "",
                 time: SystemTime::now()
@@ -251,7 +334,7 @@ impl Telemetry {
                     .unwrap()
                     .as_millis(),
                 distinct_id: state.device_id.clone(),
-                insert_id: post_inc(&mut state.next_event_id),
+                insert_id: post_inc(&mut state.next_mixpanel_event_id),
                 event_properties: if let Value::Object(properties) = properties {
                     Some(properties)
                 } else {
@@ -264,17 +347,17 @@ impl Telemetry {
                 signed_in: state.metrics_id.is_some(),
             },
         };
-        state.queue.push(event);
+        state.mixpanel_events_queue.push(event);
         if state.device_id.is_some() {
-            if state.queue.len() >= MAX_QUEUE_LEN {
+            if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
                 drop(state);
-                self.flush();
+                self.flush_mixpanel_events();
             } else {
                 let this = self.clone();
                 let executor = self.executor.clone();
-                state.flush_task = Some(self.executor.spawn(async move {
+                state.flush_mixpanel_events_task = Some(self.executor.spawn(async move {
                     executor.timer(DEBOUNCE_INTERVAL).await;
-                    this.flush();
+                    this.flush_mixpanel_events();
                 }));
             }
         }
@@ -288,10 +371,10 @@ impl Telemetry {
         self.state.lock().is_staff
     }
 
-    fn flush(self: &Arc<Self>) {
+    fn flush_mixpanel_events(self: &Arc<Self>) {
         let mut state = self.state.lock();
-        let mut events = mem::take(&mut state.queue);
-        state.flush_task.take();
+        let mut events = mem::take(&mut state.mixpanel_events_queue);
+        state.flush_mixpanel_events_task.take();
         drop(state);
 
         if let Some(token) = MIXPANEL_TOKEN.as_ref() {
@@ -325,4 +408,53 @@ impl Telemetry {
                 .detach();
         }
     }
+
+    fn flush_clickhouse_events(self: &Arc<Self>) {
+        let mut state = self.state.lock();
+        let mut events = mem::take(&mut state.clickhouse_events_queue);
+        state.flush_clickhouse_events_task.take();
+        drop(state);
+
+        let this = self.clone();
+        self.executor
+            .spawn(
+                async move {
+                    let mut json_bytes = Vec::new();
+
+                    if let Some(file) = &mut this.state.lock().log_file {
+                        let file = file.as_file_mut();
+                        for event in &mut events {
+                            json_bytes.clear();
+                            serde_json::to_writer(&mut json_bytes, event)?;
+                            file.write_all(&json_bytes)?;
+                            file.write(b"\n")?;
+                        }
+                    }
+
+                    {
+                        let state = this.state.lock();
+                        json_bytes.clear();
+                        serde_json::to_writer(
+                            &mut json_bytes,
+                            &ClickhouseEventRequestBody {
+                                token: ZED_SECRET_CLIENT_TOKEN,
+                                installation_id: state.device_id.clone(),
+                                app_version: state.app_version.clone(),
+                                os_name: state.os_name,
+                                os_version: state.os_version.clone(),
+                                release_channel: state.release_channel,
+                                events,
+                            },
+                        )?;
+                    }
+
+                    this.http_client
+                        .post_json(CLICKHOUSE_EVENTS_URL.as_str(), json_bytes.into())
+                        .await?;
+                    anyhow::Ok(())
+                }
+                .log_err(),
+            )
+            .detach();
+    }
 }

crates/editor/Cargo.toml 🔗

@@ -23,6 +23,7 @@ test-support = [
 ]
 
 [dependencies]
+client = { path = "../client" }
 clock = { path = "../clock" }
 copilot = { path = "../copilot" }
 db = { path = "../db" }

crates/editor/src/editor.rs 🔗

@@ -22,6 +22,7 @@ pub mod test;
 use aho_corasick::AhoCorasick;
 use anyhow::{anyhow, Result};
 use blink_manager::BlinkManager;
+use client::ClickhouseEvent;
 use clock::ReplicaId;
 use collections::{BTreeMap, Bound, HashMap, HashSet, VecDeque};
 use copilot::Copilot;
@@ -1295,7 +1296,7 @@ impl Editor {
             cx.set_global(ScrollbarAutoHide(should_auto_hide_scrollbars));
         }
 
-        this.report_event("open editor", cx);
+        this.report_editor_event("open", cx);
         this
     }
 
@@ -6819,7 +6820,7 @@ impl Editor {
             .collect()
     }
 
-    fn report_event(&self, name: &str, cx: &AppContext) {
+    fn report_editor_event(&self, name: &'static str, cx: &AppContext) {
         if let Some((project, file)) = self.project.as_ref().zip(
             self.buffer
                 .read(cx)
@@ -6831,11 +6832,28 @@ impl Editor {
             let extension = Path::new(file.file_name(cx))
                 .extension()
                 .and_then(|e| e.to_str());
-            project.read(cx).client().report_event(
-                name,
-                json!({ "File Extension": extension, "Vim Mode": settings.vim_mode  }),
+            let telemetry = project.read(cx).client().telemetry().clone();
+            telemetry.report_mixpanel_event(
+                match name {
+                    "open" => "open editor",
+                    "save" => "save editor",
+                    _ => name,
+                },
+                json!({ "File Extension": extension, "Vim Mode": settings.vim_mode, "In Clickhouse": true  }),
                 settings.telemetry(),
             );
+            let event = ClickhouseEvent::Editor {
+                file_extension: extension.map(ToString::to_string),
+                vim_mode: settings.vim_mode,
+                operation: name,
+                copilot_enabled: settings.features.copilot,
+                copilot_enabled_for_language: settings.show_copilot_suggestions(
+                    self.language_at(0, cx)
+                        .map(|language| language.name())
+                        .as_deref(),
+                ),
+            };
+            telemetry.report_clickhouse_event(event, settings.telemetry())
         }
     }
 

crates/editor/src/items.rs 🔗

@@ -636,7 +636,7 @@ impl Item for Editor {
         project: ModelHandle<Project>,
         cx: &mut ViewContext<Self>,
     ) -> Task<Result<()>> {
-        self.report_event("save editor", cx);
+        self.report_editor_event("save", cx);
         let format = self.perform_format(project.clone(), FormatTrigger::Save, cx);
         let buffers = self.buffer().clone().read(cx).all_buffers();
         cx.spawn(|_, mut cx| async move {

crates/feedback/src/feedback_editor.rs 🔗

@@ -164,8 +164,9 @@ impl FeedbackEditor {
     ) -> anyhow::Result<()> {
         let feedback_endpoint = format!("{}/api/feedback", *ZED_SERVER_URL);
 
-        let metrics_id = zed_client.metrics_id();
-        let is_staff = zed_client.is_staff();
+        let telemetry = zed_client.telemetry();
+        let metrics_id = telemetry.metrics_id();
+        let is_staff = telemetry.is_staff();
         let http_client = zed_client.http_client();
 
         let request = FeedbackRequestBody {

crates/zed/src/main.rs 🔗

@@ -172,8 +172,8 @@ fn main() {
         })
         .detach();
 
-        client.start_telemetry();
-        client.report_event(
+        client.telemetry().start();
+        client.telemetry().report_mixpanel_event(
             "start app",
             Default::default(),
             cx.global::<Settings>().telemetry(),

crates/zed/src/zed.rs 🔗

@@ -573,7 +573,7 @@ fn open_telemetry_log_file(
     workspace.with_local_workspace(&app_state.clone(), cx, move |_, cx| {
         cx.spawn(|workspace, mut cx| async move {
             async fn fetch_log_string(app_state: &Arc<AppState>) -> Option<String> {
-                let path = app_state.client.telemetry_log_file_path()?;
+                let path = app_state.client.telemetry().log_file_path()?;
                 app_state.fs.load(&path).await.log_err()
             }