diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 866d0acc0eb40355a09a0811e216cb8c5c0eeba0..3c9c9481e73aa6c02882b170213dfbce942ed6c9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,6 +57,7 @@ jobs: APPLE_NOTARIZATION_USERNAME: ${{ secrets.APPLE_NOTARIZATION_USERNAME }} APPLE_NOTARIZATION_PASSWORD: ${{ secrets.APPLE_NOTARIZATION_PASSWORD }} ZED_AMPLITUDE_API_KEY: ${{ secrets.ZED_AMPLITUDE_API_KEY }} + ZED_MIXPANEL_TOKEN: ${{ secrets.ZED_MIXPANEL_TOKEN }} steps: - name: Install Rust run: | diff --git a/.github/workflows/release_actions.yml b/.github/workflows/release_actions.yml index 9a3b2376df472eba470dc92fca7293d8a015206a..17cb8864e15f8c2994cb5c738d16458b89463f2c 100644 --- a/.github/workflows/release_actions.yml +++ b/.github/workflows/release_actions.yml @@ -30,4 +30,4 @@ jobs: architecture: "x64" cache: "pip" - run: pip install -r script/amplitude_release/requirements.txt - - run: python script/amplitude_release/main.py ${{ github.event.release.tag_name }} ${{ secrets.ZED_AMPLITUDE_API_KEY }} ${{ secrets.ZED_AMPLITUDE_SECRET_KEY }} \ No newline at end of file + - run: python script/amplitude_release/main.py ${{ github.event.release.tag_name }} ${{ secrets.ZED_AMPLITUDE_API_KEY }} ${{ secrets.ZED_AMPLITUDE_SECRET_KEY }} diff --git a/crates/client/src/amplitude_telemetry.rs b/crates/client/src/amplitude_telemetry.rs new file mode 100644 index 0000000000000000000000000000000000000000..5db2bedf03016b59fc81c54fa5d1cf0f999e5140 --- /dev/null +++ b/crates/client/src/amplitude_telemetry.rs @@ -0,0 +1,277 @@ +use crate::http::HttpClient; +use db::Db; +use gpui::{ + executor::Background, + serde_json::{self, value::Map, Value}, + AppContext, Task, +}; +use isahc::Request; +use lazy_static::lazy_static; +use parking_lot::Mutex; +use serde::Serialize; +use serde_json::json; +use std::{ + io::Write, + mem, + path::PathBuf, + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use tempfile::NamedTempFile; +use util::{post_inc, ResultExt, TryFutureExt}; +use uuid::Uuid; + +pub struct AmplitudeTelemetry { + http_client: Arc, + executor: Arc, + session_id: u128, + state: Mutex, +} + +#[derive(Default)] +struct AmplitudeTelemetryState { + metrics_id: Option>, + device_id: Option>, + app_version: Option>, + os_version: Option>, + os_name: &'static str, + queue: Vec, + next_event_id: usize, + flush_task: Option>, + log_file: Option, +} + +const AMPLITUDE_EVENTS_URL: &'static str = "https://api2.amplitude.com/batch"; + +lazy_static! { + static ref AMPLITUDE_API_KEY: Option = std::env::var("ZED_AMPLITUDE_API_KEY") + .ok() + .or_else(|| option_env!("ZED_AMPLITUDE_API_KEY").map(|key| key.to_string())); +} + +#[derive(Serialize)] +struct AmplitudeEventBatch { + api_key: &'static str, + events: Vec, +} + +#[derive(Serialize)] +struct AmplitudeEvent { + #[serde(skip_serializing_if = "Option::is_none")] + user_id: Option>, + device_id: Option>, + event_type: String, + #[serde(skip_serializing_if = "Option::is_none")] + event_properties: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + user_properties: Option>, + os_name: &'static str, + os_version: Option>, + app_version: Option>, + platform: &'static str, + event_id: usize, + session_id: u128, + time: u128, +} + +#[cfg(debug_assertions)] +const MAX_QUEUE_LEN: usize = 1; + +#[cfg(not(debug_assertions))] +const MAX_QUEUE_LEN: usize = 10; + +#[cfg(debug_assertions)] +const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1); + +#[cfg(not(debug_assertions))] +const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30); + +impl AmplitudeTelemetry { + pub fn new(client: Arc, cx: &AppContext) -> Arc { + let platform = cx.platform(); + let this = Arc::new(Self { + http_client: client, + executor: cx.background().clone(), + session_id: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(), + state: Mutex::new(AmplitudeTelemetryState { + os_version: platform + .os_version() + .log_err() + .map(|v| v.to_string().into()), + os_name: platform.os_name().into(), + app_version: platform + .app_version() + .log_err() + .map(|v| v.to_string().into()), + device_id: None, + queue: Default::default(), + flush_task: Default::default(), + next_event_id: 0, + log_file: None, + metrics_id: None, + }), + }); + + if AMPLITUDE_API_KEY.is_some() { + this.executor + .spawn({ + let this = this.clone(); + async move { + if let Some(tempfile) = NamedTempFile::new().log_err() { + this.state.lock().log_file = Some(tempfile); + } + } + }) + .detach(); + } + + this + } + + pub fn log_file_path(&self) -> Option { + Some(self.state.lock().log_file.as_ref()?.path().to_path_buf()) + } + + pub fn start(self: &Arc, db: Db) { + let this = self.clone(); + self.executor + .spawn( + async move { + let device_id = if let Ok(Some(device_id)) = db.read_kvp("device_id") { + device_id + } else { + let device_id = Uuid::new_v4().to_string(); + db.write_kvp("device_id", &device_id)?; + device_id + }; + + let device_id = Some(Arc::from(device_id)); + let mut state = this.state.lock(); + state.device_id = device_id.clone(); + for event in &mut state.queue { + event.device_id = device_id.clone(); + } + if !state.queue.is_empty() { + drop(state); + this.flush(); + } + + anyhow::Ok(()) + } + .log_err(), + ) + .detach(); + } + + pub fn set_authenticated_user_info( + self: &Arc, + metrics_id: Option, + is_staff: bool, + ) { + let is_signed_in = metrics_id.is_some(); + self.state.lock().metrics_id = metrics_id.map(|s| s.into()); + if is_signed_in { + self.report_event_with_user_properties( + "$identify", + Default::default(), + json!({ "$set": { "staff": is_staff } }), + ) + } + } + + pub fn report_event(self: &Arc, kind: &str, properties: Value) { + self.report_event_with_user_properties(kind, properties, Default::default()); + } + + fn report_event_with_user_properties( + self: &Arc, + kind: &str, + properties: Value, + user_properties: Value, + ) { + if AMPLITUDE_API_KEY.is_none() { + return; + } + + let mut state = self.state.lock(); + let event = AmplitudeEvent { + event_type: kind.to_string(), + time: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(), + session_id: self.session_id, + event_properties: if let Value::Object(properties) = properties { + Some(properties) + } else { + None + }, + user_properties: if let Value::Object(user_properties) = user_properties { + Some(user_properties) + } else { + None + }, + user_id: state.metrics_id.clone(), + device_id: state.device_id.clone(), + os_name: state.os_name, + platform: "Zed", + os_version: state.os_version.clone(), + app_version: state.app_version.clone(), + event_id: post_inc(&mut state.next_event_id), + }; + state.queue.push(event); + if state.device_id.is_some() { + if state.queue.len() >= MAX_QUEUE_LEN { + drop(state); + self.flush(); + } else { + let this = self.clone(); + let executor = self.executor.clone(); + state.flush_task = Some(self.executor.spawn(async move { + executor.timer(DEBOUNCE_INTERVAL).await; + this.flush(); + })); + } + } + } + + fn flush(self: &Arc) { + let mut state = self.state.lock(); + let events = mem::take(&mut state.queue); + state.flush_task.take(); + drop(state); + + if let Some(api_key) = AMPLITUDE_API_KEY.as_ref() { + let this = self.clone(); + self.executor + .spawn( + async move { + let mut json_bytes = Vec::new(); + + if let Some(file) = &mut this.state.lock().log_file { + let file = file.as_file_mut(); + for event in &events { + json_bytes.clear(); + serde_json::to_writer(&mut json_bytes, event)?; + file.write_all(&json_bytes)?; + file.write(b"\n")?; + } + } + + let batch = AmplitudeEventBatch { api_key, events }; + json_bytes.clear(); + serde_json::to_writer(&mut json_bytes, &batch)?; + let request = + Request::post(AMPLITUDE_EVENTS_URL).body(json_bytes.into())?; + this.http_client.send(request).await?; + Ok(()) + } + .log_err(), + ) + .detach(); + } + } +} diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 3878cc90c3826ebbb64b924be2b847d53858f878..8f6d4aa10d798686d1b59e3c46c3129beeffd280 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1,11 +1,13 @@ #[cfg(any(test, feature = "test-support"))] pub mod test; +pub mod amplitude_telemetry; pub mod channel; pub mod http; pub mod telemetry; pub mod user; +use amplitude_telemetry::AmplitudeTelemetry; use anyhow::{anyhow, Context, Result}; use async_recursion::async_recursion; use async_tungstenite::tungstenite::{ @@ -82,6 +84,7 @@ pub struct Client { peer: Arc, http: Arc, telemetry: Arc, + amplitude_telemetry: Arc, state: RwLock, #[allow(clippy::type_complexity)] @@ -261,6 +264,7 @@ impl Client { id: 0, peer: Peer::new(), telemetry: Telemetry::new(http.clone(), cx), + amplitude_telemetry: AmplitudeTelemetry::new(http.clone(), cx), http, state: Default::default(), @@ -373,6 +377,8 @@ impl Client { } Status::SignedOut | Status::UpgradeRequired => { self.telemetry.set_authenticated_user_info(None, false); + self.amplitude_telemetry + .set_authenticated_user_info(None, false); state._reconnect_task.take(); } _ => {} @@ -1013,6 +1019,7 @@ impl Client { let platform = cx.platform(); let executor = cx.background(); let telemetry = self.telemetry.clone(); + let amplitude_telemetry = self.amplitude_telemetry.clone(); let http = self.http.clone(); executor.clone().spawn(async move { // Generate a pair of asymmetric encryption keys. The public key will be used by the @@ -1097,6 +1104,7 @@ impl Client { platform.activate(true); telemetry.report_event("authenticate with browser", Default::default()); + amplitude_telemetry.report_event("authenticate with browser", Default::default()); Ok(Credentials { user_id: user_id.parse()?, @@ -1208,14 +1216,17 @@ impl Client { } pub fn start_telemetry(&self, db: Db) { - self.telemetry.start(db); + self.telemetry.start(db.clone()); + self.amplitude_telemetry.start(db); } pub fn report_event(&self, kind: &str, properties: Value) { - self.telemetry.report_event(kind, properties) + self.telemetry.report_event(kind, properties.clone()); + self.amplitude_telemetry.report_event(kind, properties); } pub fn telemetry_log_file_path(&self) -> Option { + self.amplitude_telemetry.log_file_path(); self.telemetry.log_file_path() } } diff --git a/crates/client/src/telemetry.rs b/crates/client/src/telemetry.rs index 6829eab53150901755c1f3d89025f3076acd34f0..02c1790664c28758cf366e43948b4aa2396032ec 100644 --- a/crates/client/src/telemetry.rs +++ b/crates/client/src/telemetry.rs @@ -24,7 +24,6 @@ use uuid::Uuid; pub struct Telemetry { http_client: Arc, executor: Arc, - session_id: u128, state: Mutex, } @@ -35,43 +34,54 @@ struct TelemetryState { app_version: Option>, os_version: Option>, os_name: &'static str, - queue: Vec, + queue: Vec, next_event_id: usize, flush_task: Option>, log_file: Option, } -const AMPLITUDE_EVENTS_URL: &'static str = "https://api2.amplitude.com/batch"; +const MIXPANEL_EVENTS_URL: &'static str = "https://api.mixpanel.com/track"; +const MIXPANEL_ENGAGE_URL: &'static str = "https://api.mixpanel.com/engage#profile-set"; lazy_static! { - static ref AMPLITUDE_API_KEY: Option = std::env::var("ZED_AMPLITUDE_API_KEY") + static ref MIXPANEL_TOKEN: Option = std::env::var("ZED_MIXPANEL_TOKEN") .ok() - .or_else(|| option_env!("ZED_AMPLITUDE_API_KEY").map(|key| key.to_string())); + .or_else(|| option_env!("ZED_MIXPANEL_TOKEN").map(|key| key.to_string())); } -#[derive(Serialize)] -struct AmplitudeEventBatch { - api_key: &'static str, - events: Vec, +#[derive(Serialize, Debug)] +struct MixpanelEvent { + event: String, + properties: MixpanelEventProperties, } -#[derive(Serialize)] -struct AmplitudeEvent { - #[serde(skip_serializing_if = "Option::is_none")] - user_id: Option>, - device_id: Option>, - event_type: String, - #[serde(skip_serializing_if = "Option::is_none")] +#[derive(Serialize, Debug)] +struct MixpanelEventProperties { + // Mixpanel required fields + #[serde(skip_serializing_if = "str::is_empty")] + token: &'static str, + time: u128, + distinct_id: Option>, + #[serde(rename = "$insert_id")] + insert_id: usize, + // Custom fields + #[serde(skip_serializing_if = "Option::is_none", flatten)] event_properties: Option>, - #[serde(skip_serializing_if = "Option::is_none")] - user_properties: Option>, os_name: &'static str, os_version: Option>, app_version: Option>, + signed_in: bool, platform: &'static str, - event_id: usize, - session_id: u128, - time: u128, +} + +#[derive(Serialize)] +struct MixpanelEngageRequest { + #[serde(rename = "$token")] + token: &'static str, + #[serde(rename = "$distinct_id")] + distinct_id: Arc, + #[serde(rename = "$set")] + set: Value, } #[cfg(debug_assertions)] @@ -92,10 +102,6 @@ impl Telemetry { let this = Arc::new(Self { http_client: client, executor: cx.background().clone(), - session_id: SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis(), state: Mutex::new(TelemetryState { os_version: platform .os_version() @@ -107,15 +113,15 @@ impl Telemetry { .log_err() .map(|v| v.to_string().into()), device_id: None, + metrics_id: None, queue: Default::default(), flush_task: Default::default(), next_event_id: 0, log_file: None, - metrics_id: None, }), }); - if AMPLITUDE_API_KEY.is_some() { + if MIXPANEL_TOKEN.is_some() { this.executor .spawn({ let this = this.clone(); @@ -148,11 +154,14 @@ impl Telemetry { device_id }; - let device_id = Some(Arc::from(device_id)); + let device_id: Arc = device_id.into(); let mut state = this.state.lock(); - state.device_id = device_id.clone(); + state.device_id = Some(device_id.clone()); for event in &mut state.queue { - event.device_id = device_id.clone(); + event + .properties + .distinct_id + .get_or_insert_with(|| device_id.clone()); } if !state.queue.is_empty() { drop(state); @@ -171,56 +180,57 @@ impl Telemetry { metrics_id: Option, is_staff: bool, ) { - let is_signed_in = metrics_id.is_some(); - self.state.lock().metrics_id = metrics_id.map(|s| s.into()); - if is_signed_in { - self.report_event_with_user_properties( - "$identify", - Default::default(), - json!({ "$set": { "staff": is_staff } }), - ) + let this = self.clone(); + let mut state = self.state.lock(); + let device_id = state.device_id.clone(); + let metrics_id: Option> = metrics_id.map(|id| id.into()); + state.metrics_id = metrics_id.clone(); + drop(state); + + if let Some((token, device_id)) = MIXPANEL_TOKEN.as_ref().zip(device_id) { + self.executor + .spawn( + async move { + let json_bytes = serde_json::to_vec(&[MixpanelEngageRequest { + token, + distinct_id: device_id, + set: json!({ "staff": is_staff, "id": metrics_id }), + }])?; + let request = Request::post(MIXPANEL_ENGAGE_URL) + .header("Content-Type", "application/json") + .body(json_bytes.into())?; + this.http_client.send(request).await?; + Ok(()) + } + .log_err(), + ) + .detach(); } } pub fn report_event(self: &Arc, kind: &str, properties: Value) { - self.report_event_with_user_properties(kind, properties, Default::default()); - } - - fn report_event_with_user_properties( - self: &Arc, - kind: &str, - properties: Value, - user_properties: Value, - ) { - if AMPLITUDE_API_KEY.is_none() { - return; - } - let mut state = self.state.lock(); - let event = AmplitudeEvent { - event_type: kind.to_string(), - time: SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis(), - session_id: self.session_id, - event_properties: if let Value::Object(properties) = properties { - Some(properties) - } else { - None - }, - user_properties: if let Value::Object(user_properties) = user_properties { - Some(user_properties) - } else { - None + let event = MixpanelEvent { + event: kind.to_string(), + properties: MixpanelEventProperties { + token: "", + time: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(), + distinct_id: state.device_id.clone(), + insert_id: post_inc(&mut state.next_event_id), + event_properties: if let Value::Object(properties) = properties { + Some(properties) + } else { + None + }, + os_name: state.os_name, + os_version: state.os_version.clone(), + app_version: state.app_version.clone(), + signed_in: state.metrics_id.is_some(), + platform: "Zed", }, - user_id: state.metrics_id.clone(), - device_id: state.device_id.clone(), - os_name: state.os_name, - platform: "Zed", - os_version: state.os_version.clone(), - app_version: state.app_version.clone(), - event_id: post_inc(&mut state.next_event_id), }; state.queue.push(event); if state.device_id.is_some() { @@ -240,11 +250,11 @@ impl Telemetry { fn flush(self: &Arc) { let mut state = self.state.lock(); - let events = mem::take(&mut state.queue); + let mut events = mem::take(&mut state.queue); state.flush_task.take(); drop(state); - if let Some(api_key) = AMPLITUDE_API_KEY.as_ref() { + if let Some(token) = MIXPANEL_TOKEN.as_ref() { let this = self.clone(); self.executor .spawn( @@ -253,19 +263,21 @@ impl Telemetry { if let Some(file) = &mut this.state.lock().log_file { let file = file.as_file_mut(); - for event in &events { + for event in &mut events { json_bytes.clear(); serde_json::to_writer(&mut json_bytes, event)?; file.write_all(&json_bytes)?; file.write(b"\n")?; + + event.properties.token = token; } } - let batch = AmplitudeEventBatch { api_key, events }; json_bytes.clear(); - serde_json::to_writer(&mut json_bytes, &batch)?; - let request = - Request::post(AMPLITUDE_EVENTS_URL).body(json_bytes.into())?; + serde_json::to_writer(&mut json_bytes, &events)?; + let request = Request::post(MIXPANEL_EVENTS_URL) + .header("Content-Type", "application/json") + .body(json_bytes.into())?; this.http_client.send(request).await?; Ok(()) } diff --git a/crates/client/src/user.rs b/crates/client/src/user.rs index 3c3d7e7fb3e199e16e28c106deda13d473a9c840..d06a6682c5e0fb2589d19fa23909e0988794b9bc 100644 --- a/crates/client/src/user.rs +++ b/crates/client/src/user.rs @@ -143,13 +143,24 @@ impl UserStore { let (user, info) = futures::join!(fetch_user, fetch_metrics_id); if let Some(info) = info { client.telemetry.set_authenticated_user_info( + Some(info.metrics_id.clone()), + info.staff, + ); + client.amplitude_telemetry.set_authenticated_user_info( Some(info.metrics_id), info.staff, ); } else { client.telemetry.set_authenticated_user_info(None, false); + client + .amplitude_telemetry + .set_authenticated_user_info(None, false); } + client.telemetry.report_event("sign in", Default::default()); + client + .amplitude_telemetry + .report_event("sign in", Default::default()); current_user_tx.send(user).await.ok(); } } diff --git a/crates/zed/build.rs b/crates/zed/build.rs index f030f98e14b27b11075ec7dbea2ce194dd4ecc31..8f56bcd34069b97e72aeb3b5889970f10d3101c7 100644 --- a/crates/zed/build.rs +++ b/crates/zed/build.rs @@ -3,6 +3,9 @@ use std::process::Command; fn main() { println!("cargo:rustc-env=MACOSX_DEPLOYMENT_TARGET=10.14"); + if let Ok(api_key) = std::env::var("ZED_MIXPANEL_TOKEN") { + println!("cargo:rustc-env=ZED_MIXPANEL_TOKEN={api_key}"); + } if let Ok(api_key) = std::env::var("ZED_AMPLITUDE_API_KEY") { println!("cargo:rustc-env=ZED_AMPLITUDE_API_KEY={api_key}"); }