@@ -24,7 +24,6 @@ use uuid::Uuid;
pub struct Telemetry {
http_client: Arc<dyn HttpClient>,
executor: Arc<Background>,
- session_id: u128,
state: Mutex<TelemetryState>,
}
@@ -35,43 +34,54 @@ struct TelemetryState {
app_version: Option<Arc<str>>,
os_version: Option<Arc<str>>,
os_name: &'static str,
- queue: Vec<AmplitudeEvent>,
+ queue: Vec<MixpanelEvent>,
next_event_id: usize,
flush_task: Option<Task<()>>,
log_file: Option<NamedTempFile>,
}
-const AMPLITUDE_EVENTS_URL: &'static str = "https://api2.amplitude.com/batch";
+const MIXPANEL_EVENTS_URL: &'static str = "https://api.mixpanel.com/track";
+const MIXPANEL_ENGAGE_URL: &'static str = "https://api.mixpanel.com/engage#profile-set";
lazy_static! {
- static ref AMPLITUDE_API_KEY: Option<String> = std::env::var("ZED_AMPLITUDE_API_KEY")
+ static ref MIXPANEL_TOKEN: Option<String> = std::env::var("ZED_MIXPANEL_TOKEN")
.ok()
- .or_else(|| option_env!("ZED_AMPLITUDE_API_KEY").map(|key| key.to_string()));
+ .or_else(|| option_env!("ZED_MIXPANEL_TOKEN").map(|key| key.to_string()));
}
-#[derive(Serialize)]
-struct AmplitudeEventBatch {
- api_key: &'static str,
- events: Vec<AmplitudeEvent>,
+#[derive(Serialize, Debug)]
+struct MixpanelEvent {
+ event: String,
+ properties: MixpanelEventProperties,
}
-#[derive(Serialize)]
-struct AmplitudeEvent {
- #[serde(skip_serializing_if = "Option::is_none")]
- user_id: Option<Arc<str>>,
- device_id: Option<Arc<str>>,
- event_type: String,
- #[serde(skip_serializing_if = "Option::is_none")]
+#[derive(Serialize, Debug)]
+struct MixpanelEventProperties {
+ // Mixpanel required fields
+ #[serde(skip_serializing_if = "str::is_empty")]
+ token: &'static str,
+ time: u128,
+ distinct_id: Option<Arc<str>>,
+ #[serde(rename = "$insert_id")]
+ insert_id: usize,
+ // Custom fields
+ #[serde(skip_serializing_if = "Option::is_none", flatten)]
event_properties: Option<Map<String, Value>>,
- #[serde(skip_serializing_if = "Option::is_none")]
- user_properties: Option<Map<String, Value>>,
os_name: &'static str,
os_version: Option<Arc<str>>,
app_version: Option<Arc<str>>,
+ signed_in: bool,
platform: &'static str,
- event_id: usize,
- session_id: u128,
- time: u128,
+}
+
+#[derive(Serialize)]
+struct MixpanelEngageRequest {
+ #[serde(rename = "$token")]
+ token: &'static str,
+ #[serde(rename = "$distinct_id")]
+ distinct_id: Arc<str>,
+ #[serde(rename = "$set")]
+ set: Value,
}
#[cfg(debug_assertions)]
@@ -92,10 +102,6 @@ impl Telemetry {
let this = Arc::new(Self {
http_client: client,
executor: cx.background().clone(),
- session_id: SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .unwrap()
- .as_millis(),
state: Mutex::new(TelemetryState {
os_version: platform
.os_version()
@@ -107,15 +113,15 @@ impl Telemetry {
.log_err()
.map(|v| v.to_string().into()),
device_id: None,
+ metrics_id: None,
queue: Default::default(),
flush_task: Default::default(),
next_event_id: 0,
log_file: None,
- metrics_id: None,
}),
});
- if AMPLITUDE_API_KEY.is_some() {
+ if MIXPANEL_TOKEN.is_some() {
this.executor
.spawn({
let this = this.clone();
@@ -148,11 +154,14 @@ impl Telemetry {
device_id
};
- let device_id = Some(Arc::from(device_id));
+ let device_id: Arc<str> = device_id.into();
let mut state = this.state.lock();
- state.device_id = device_id.clone();
+ state.device_id = Some(device_id.clone());
for event in &mut state.queue {
- event.device_id = device_id.clone();
+ event
+ .properties
+ .distinct_id
+ .get_or_insert_with(|| device_id.clone());
}
if !state.queue.is_empty() {
drop(state);
@@ -171,56 +180,63 @@ impl Telemetry {
metrics_id: Option<String>,
is_staff: bool,
) {
- let is_signed_in = metrics_id.is_some();
- self.state.lock().metrics_id = metrics_id.map(|s| s.into());
- if is_signed_in {
- self.report_event_with_user_properties(
- "$identify",
- Default::default(),
- json!({ "$set": { "staff": is_staff } }),
- )
- }
- }
+ let this = self.clone();
+ let mut state = self.state.lock();
+ let device_id = state.device_id.clone();
+ let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
+ state.metrics_id = metrics_id.clone();
+ drop(state);
- pub fn report_event(self: &Arc<Self>, kind: &str, properties: Value) {
- self.report_event_with_user_properties(kind, properties, Default::default());
- }
+ if let Some((token, device_id)) = MIXPANEL_TOKEN.as_ref().zip(device_id) {
+ self.executor
+ .spawn(
+ async move {
+ let json_bytes = serde_json::to_vec(&[MixpanelEngageRequest {
+ token,
+ distinct_id: device_id,
+ set: json!({ "staff": is_staff, "id": metrics_id }),
+ }])?;
- fn report_event_with_user_properties(
- self: &Arc<Self>,
- kind: &str,
- properties: Value,
- user_properties: Value,
- ) {
- if AMPLITUDE_API_KEY.is_none() {
- return;
+ eprintln!("request: {}", std::str::from_utf8(&json_bytes).unwrap());
+
+ let request = Request::post(MIXPANEL_ENGAGE_URL)
+ .header("Content-Type", "application/json")
+ .body(json_bytes.into())?;
+ let response = this.http_client.send(request).await?;
+
+ eprintln!("response: {:?} {:?}", response.status(), response.body());
+
+ Ok(())
+ }
+ .log_err(),
+ )
+ .detach();
}
+ }
+ pub fn report_event(self: &Arc<Self>, kind: &str, properties: Value) {
let mut state = self.state.lock();
- let event = AmplitudeEvent {
- event_type: kind.to_string(),
- time: SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .unwrap()
- .as_millis(),
- session_id: self.session_id,
- event_properties: if let Value::Object(properties) = properties {
- Some(properties)
- } else {
- None
- },
- user_properties: if let Value::Object(user_properties) = user_properties {
- Some(user_properties)
- } else {
- None
+ let event = MixpanelEvent {
+ event: kind.to_string(),
+ properties: MixpanelEventProperties {
+ token: "",
+ time: SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap()
+ .as_millis(),
+ distinct_id: state.device_id.clone(),
+ insert_id: post_inc(&mut state.next_event_id),
+ event_properties: if let Value::Object(properties) = properties {
+ Some(properties)
+ } else {
+ None
+ },
+ os_name: state.os_name,
+ os_version: state.os_version.clone(),
+ app_version: state.app_version.clone(),
+ signed_in: state.metrics_id.is_some(),
+ platform: "Zed",
},
- user_id: state.metrics_id.clone(),
- device_id: state.device_id.clone(),
- os_name: state.os_name,
- platform: "Zed",
- os_version: state.os_version.clone(),
- app_version: state.app_version.clone(),
- event_id: post_inc(&mut state.next_event_id),
};
state.queue.push(event);
if state.device_id.is_some() {
@@ -240,11 +256,11 @@ impl Telemetry {
fn flush(self: &Arc<Self>) {
let mut state = self.state.lock();
- let events = mem::take(&mut state.queue);
+ let mut events = mem::take(&mut state.queue);
state.flush_task.take();
drop(state);
- if let Some(api_key) = AMPLITUDE_API_KEY.as_ref() {
+ if let Some(token) = MIXPANEL_TOKEN.as_ref() {
let this = self.clone();
self.executor
.spawn(
@@ -253,20 +269,28 @@ impl Telemetry {
if let Some(file) = &mut this.state.lock().log_file {
let file = file.as_file_mut();
- for event in &events {
+ for event in &mut events {
json_bytes.clear();
serde_json::to_writer(&mut json_bytes, event)?;
file.write_all(&json_bytes)?;
file.write(b"\n")?;
+
+ event.properties.token = token;
}
}
- let batch = AmplitudeEventBatch { api_key, events };
json_bytes.clear();
- serde_json::to_writer(&mut json_bytes, &batch)?;
- let request =
- Request::post(AMPLITUDE_EVENTS_URL).body(json_bytes.into())?;
- this.http_client.send(request).await?;
+ serde_json::to_writer(&mut json_bytes, &events)?;
+
+ eprintln!("request: {}", std::str::from_utf8(&json_bytes).unwrap());
+
+ let request = Request::post(MIXPANEL_EVENTS_URL)
+ .header("Content-Type", "application/json")
+ .body(json_bytes.into())?;
+ let response = this.http_client.send(request).await?;
+
+ eprintln!("response: {:?} {:?}", response.status(), response.body());
+
Ok(())
}
.log_err(),