1use crate::{ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL};
2use db::kvp::KEY_VALUE_STORE;
3use gpui::{
4 executor::Background,
5 serde_json::{self, value::Map, Value},
6 AppContext, Task,
7};
8use lazy_static::lazy_static;
9use parking_lot::Mutex;
10use serde::Serialize;
11use serde_json::json;
12use settings::TelemetrySettings;
13use std::{
14 io::Write,
15 mem,
16 path::PathBuf,
17 sync::Arc,
18 time::{Duration, SystemTime, UNIX_EPOCH},
19};
20use tempfile::NamedTempFile;
21use util::http::HttpClient;
22use util::{channel::ReleaseChannel, post_inc, ResultExt, TryFutureExt};
23use uuid::Uuid;
24
25pub struct Telemetry {
26 http_client: Arc<dyn HttpClient>,
27 executor: Arc<Background>,
28 state: Mutex<TelemetryState>,
29}
30
31#[derive(Default)]
32struct TelemetryState {
33 metrics_id: Option<Arc<str>>,
34 device_id: Option<Arc<str>>,
35 app_version: Option<Arc<str>>,
36 release_channel: Option<&'static str>,
37 os_version: Option<Arc<str>>,
38 os_name: &'static str,
39 mixpanel_events_queue: Vec<MixpanelEvent>, // Mixpanel mixed events - will hopefully die soon
40 clickhouse_events_queue: Vec<ClickhouseEventWrapper>,
41 next_mixpanel_event_id: usize,
42 flush_mixpanel_events_task: Option<Task<()>>,
43 flush_clickhouse_events_task: Option<Task<()>>,
44 log_file: Option<NamedTempFile>,
45 is_staff: Option<bool>,
46}
47
48const MIXPANEL_EVENTS_URL: &'static str = "https://api.mixpanel.com/track";
49const MIXPANEL_ENGAGE_URL: &'static str = "https://api.mixpanel.com/engage#profile-set";
50const CLICKHOUSE_EVENTS_URL_PATH: &'static str = "/api/events";
51
52lazy_static! {
53 static ref MIXPANEL_TOKEN: Option<String> = std::env::var("ZED_MIXPANEL_TOKEN")
54 .ok()
55 .or_else(|| option_env!("ZED_MIXPANEL_TOKEN").map(|key| key.to_string()));
56 static ref CLICKHOUSE_EVENTS_URL: String =
57 format!("{}{}", *ZED_SERVER_URL, CLICKHOUSE_EVENTS_URL_PATH);
58}
59
60#[derive(Serialize, Debug)]
61struct ClickhouseEventRequestBody {
62 token: &'static str,
63 installation_id: Option<Arc<str>>,
64 app_version: Option<Arc<str>>,
65 os_name: &'static str,
66 os_version: Option<Arc<str>>,
67 release_channel: Option<&'static str>,
68 events: Vec<ClickhouseEventWrapper>,
69}
70
71#[derive(Serialize, Debug)]
72struct ClickhouseEventWrapper {
73 time: u128,
74 signed_in: bool,
75 #[serde(flatten)]
76 event: ClickhouseEvent,
77}
78
79#[derive(Serialize, Debug)]
80#[serde(tag = "type")]
81pub enum ClickhouseEvent {
82 Editor {
83 operation: &'static str,
84 file_extension: Option<String>,
85 vim_mode: bool,
86 copilot_enabled: bool,
87 copilot_enabled_for_language: bool,
88 },
89}
90
91#[derive(Serialize, Debug)]
92struct MixpanelEvent {
93 event: String,
94 properties: MixpanelEventProperties,
95}
96
97#[derive(Serialize, Debug)]
98struct MixpanelEventProperties {
99 // Mixpanel required fields
100 #[serde(skip_serializing_if = "str::is_empty")]
101 token: &'static str,
102 time: u128,
103 distinct_id: Option<Arc<str>>,
104 #[serde(rename = "$insert_id")]
105 insert_id: usize,
106 // Custom fields
107 #[serde(skip_serializing_if = "Option::is_none", flatten)]
108 event_properties: Option<Map<String, Value>>,
109 #[serde(rename = "OS Name")]
110 os_name: &'static str,
111 #[serde(rename = "OS Version")]
112 os_version: Option<Arc<str>>,
113 #[serde(rename = "Release Channel")]
114 release_channel: Option<&'static str>,
115 #[serde(rename = "App Version")]
116 app_version: Option<Arc<str>>,
117 #[serde(rename = "Signed In")]
118 signed_in: bool,
119}
120
121#[derive(Serialize)]
122struct MixpanelEngageRequest {
123 #[serde(rename = "$token")]
124 token: &'static str,
125 #[serde(rename = "$distinct_id")]
126 distinct_id: Arc<str>,
127 #[serde(rename = "$set")]
128 set: Value,
129}
130
131#[cfg(debug_assertions)]
132const MAX_QUEUE_LEN: usize = 1;
133
134#[cfg(not(debug_assertions))]
135const MAX_QUEUE_LEN: usize = 10;
136
137#[cfg(debug_assertions)]
138const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
139
140#[cfg(not(debug_assertions))]
141const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
142
143impl Telemetry {
144 pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
145 let platform = cx.platform();
146 let release_channel = if cx.has_global::<ReleaseChannel>() {
147 Some(cx.global::<ReleaseChannel>().display_name())
148 } else {
149 None
150 };
151 let this = Arc::new(Self {
152 http_client: client,
153 executor: cx.background().clone(),
154 state: Mutex::new(TelemetryState {
155 os_version: platform.os_version().ok().map(|v| v.to_string().into()),
156 os_name: platform.os_name().into(),
157 app_version: platform.app_version().ok().map(|v| v.to_string().into()),
158 release_channel,
159 device_id: None,
160 metrics_id: None,
161 mixpanel_events_queue: Default::default(),
162 clickhouse_events_queue: Default::default(),
163 flush_mixpanel_events_task: Default::default(),
164 flush_clickhouse_events_task: Default::default(),
165 next_mixpanel_event_id: 0,
166 log_file: None,
167 is_staff: None,
168 }),
169 });
170
171 if MIXPANEL_TOKEN.is_some() {
172 this.executor
173 .spawn({
174 let this = this.clone();
175 async move {
176 if let Some(tempfile) = NamedTempFile::new().log_err() {
177 this.state.lock().log_file = Some(tempfile);
178 }
179 }
180 })
181 .detach();
182 }
183
184 this
185 }
186
187 pub fn log_file_path(&self) -> Option<PathBuf> {
188 Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
189 }
190
191 pub fn start(self: &Arc<Self>) {
192 let this = self.clone();
193 self.executor
194 .spawn(
195 async move {
196 let device_id =
197 if let Ok(Some(device_id)) = KEY_VALUE_STORE.read_kvp("device_id") {
198 device_id
199 } else {
200 let device_id = Uuid::new_v4().to_string();
201 KEY_VALUE_STORE
202 .write_kvp("device_id".to_string(), device_id.clone())
203 .await?;
204 device_id
205 };
206
207 let device_id: Arc<str> = device_id.into();
208 let mut state = this.state.lock();
209 state.device_id = Some(device_id.clone());
210
211 for event in &mut state.mixpanel_events_queue {
212 event
213 .properties
214 .distinct_id
215 .get_or_insert_with(|| device_id.clone());
216 }
217
218 let has_mixpanel_events = !state.mixpanel_events_queue.is_empty();
219 let has_clickhouse_events = !state.clickhouse_events_queue.is_empty();
220 drop(state);
221
222 if has_mixpanel_events {
223 this.flush_mixpanel_events();
224 }
225
226 if has_clickhouse_events {
227 this.flush_clickhouse_events();
228 }
229
230 anyhow::Ok(())
231 }
232 .log_err(),
233 )
234 .detach();
235 }
236
237 /// This method takes the entire TelemetrySettings struct in order to force client code
238 /// to pull the struct out of the settings global. Do not remove!
239 pub fn set_authenticated_user_info(
240 self: &Arc<Self>,
241 metrics_id: Option<String>,
242 is_staff: bool,
243 telemetry_settings: TelemetrySettings,
244 ) {
245 if !telemetry_settings.metrics() {
246 return;
247 }
248
249 let this = self.clone();
250 let mut state = self.state.lock();
251 let device_id = state.device_id.clone();
252 let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
253 state.metrics_id = metrics_id.clone();
254 state.is_staff = Some(is_staff);
255 drop(state);
256
257 if let Some((token, device_id)) = MIXPANEL_TOKEN.as_ref().zip(device_id) {
258 self.executor
259 .spawn(
260 async move {
261 let json_bytes = serde_json::to_vec(&[MixpanelEngageRequest {
262 token,
263 distinct_id: device_id,
264 set: json!({
265 "Staff": is_staff,
266 "ID": metrics_id,
267 "App": true
268 }),
269 }])?;
270
271 this.http_client
272 .post_json(MIXPANEL_ENGAGE_URL, json_bytes.into())
273 .await?;
274 anyhow::Ok(())
275 }
276 .log_err(),
277 )
278 .detach();
279 }
280 }
281
282 pub fn report_clickhouse_event(
283 self: &Arc<Self>,
284 event: ClickhouseEvent,
285 telemetry_settings: TelemetrySettings,
286 ) {
287 if !telemetry_settings.metrics() {
288 return;
289 }
290
291 let mut state = self.state.lock();
292 let signed_in = state.metrics_id.is_some();
293 state.clickhouse_events_queue.push(ClickhouseEventWrapper {
294 time: SystemTime::now()
295 .duration_since(UNIX_EPOCH)
296 .unwrap()
297 .as_millis(),
298 signed_in,
299 event,
300 });
301
302 if state.device_id.is_some() {
303 if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
304 drop(state);
305 self.flush_clickhouse_events();
306 } else {
307 let this = self.clone();
308 let executor = self.executor.clone();
309 state.flush_clickhouse_events_task = Some(self.executor.spawn(async move {
310 executor.timer(DEBOUNCE_INTERVAL).await;
311 this.flush_clickhouse_events();
312 }));
313 }
314 }
315 }
316
317 pub fn report_mixpanel_event(
318 self: &Arc<Self>,
319 kind: &str,
320 properties: Value,
321 telemetry_settings: TelemetrySettings,
322 ) {
323 if !telemetry_settings.metrics() {
324 return;
325 }
326
327 let mut state = self.state.lock();
328 let event = MixpanelEvent {
329 event: kind.into(),
330 properties: MixpanelEventProperties {
331 token: "",
332 time: SystemTime::now()
333 .duration_since(UNIX_EPOCH)
334 .unwrap()
335 .as_millis(),
336 distinct_id: state.device_id.clone(),
337 insert_id: post_inc(&mut state.next_mixpanel_event_id),
338 event_properties: if let Value::Object(properties) = properties {
339 Some(properties)
340 } else {
341 None
342 },
343 os_name: state.os_name,
344 os_version: state.os_version.clone(),
345 release_channel: state.release_channel,
346 app_version: state.app_version.clone(),
347 signed_in: state.metrics_id.is_some(),
348 },
349 };
350 state.mixpanel_events_queue.push(event);
351 if state.device_id.is_some() {
352 if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
353 drop(state);
354 self.flush_mixpanel_events();
355 } else {
356 let this = self.clone();
357 let executor = self.executor.clone();
358 state.flush_mixpanel_events_task = Some(self.executor.spawn(async move {
359 executor.timer(DEBOUNCE_INTERVAL).await;
360 this.flush_mixpanel_events();
361 }));
362 }
363 }
364 }
365
366 pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
367 self.state.lock().metrics_id.clone()
368 }
369
370 pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
371 self.state.lock().is_staff
372 }
373
374 fn flush_mixpanel_events(self: &Arc<Self>) {
375 let mut state = self.state.lock();
376 let mut events = mem::take(&mut state.mixpanel_events_queue);
377 state.flush_mixpanel_events_task.take();
378 drop(state);
379
380 if let Some(token) = MIXPANEL_TOKEN.as_ref() {
381 let this = self.clone();
382 self.executor
383 .spawn(
384 async move {
385 let mut json_bytes = Vec::new();
386
387 if let Some(file) = &mut this.state.lock().log_file {
388 let file = file.as_file_mut();
389 for event in &mut events {
390 json_bytes.clear();
391 serde_json::to_writer(&mut json_bytes, event)?;
392 file.write_all(&json_bytes)?;
393 file.write(b"\n")?;
394
395 event.properties.token = token;
396 }
397 }
398
399 json_bytes.clear();
400 serde_json::to_writer(&mut json_bytes, &events)?;
401 this.http_client
402 .post_json(MIXPANEL_EVENTS_URL, json_bytes.into())
403 .await?;
404 anyhow::Ok(())
405 }
406 .log_err(),
407 )
408 .detach();
409 }
410 }
411
412 fn flush_clickhouse_events(self: &Arc<Self>) {
413 let mut state = self.state.lock();
414 let mut events = mem::take(&mut state.clickhouse_events_queue);
415 state.flush_clickhouse_events_task.take();
416 drop(state);
417
418 let this = self.clone();
419 self.executor
420 .spawn(
421 async move {
422 let mut json_bytes = Vec::new();
423
424 if let Some(file) = &mut this.state.lock().log_file {
425 let file = file.as_file_mut();
426 for event in &mut events {
427 json_bytes.clear();
428 serde_json::to_writer(&mut json_bytes, event)?;
429 file.write_all(&json_bytes)?;
430 file.write(b"\n")?;
431 }
432 }
433
434 {
435 let state = this.state.lock();
436 json_bytes.clear();
437 serde_json::to_writer(
438 &mut json_bytes,
439 &ClickhouseEventRequestBody {
440 token: ZED_SECRET_CLIENT_TOKEN,
441 installation_id: state.device_id.clone(),
442 app_version: state.app_version.clone(),
443 os_name: state.os_name,
444 os_version: state.os_version.clone(),
445 release_channel: state.release_channel,
446 events,
447 },
448 )?;
449 }
450
451 this.http_client
452 .post_json(CLICKHOUSE_EVENTS_URL.as_str(), json_bytes.into())
453 .await?;
454 anyhow::Ok(())
455 }
456 .log_err(),
457 )
458 .detach();
459 }
460}