1use crate::{TelemetrySettings, ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL};
2use db::kvp::KEY_VALUE_STORE;
3use gpui::{
4 executor::Background,
5 serde_json::{self, value::Map, Value},
6 AppContext, Task,
7};
8use lazy_static::lazy_static;
9use parking_lot::Mutex;
10use serde::Serialize;
11use serde_json::json;
12use std::{
13 io::Write,
14 mem,
15 path::PathBuf,
16 sync::Arc,
17 time::{Duration, SystemTime, UNIX_EPOCH},
18};
19use tempfile::NamedTempFile;
20use util::http::HttpClient;
21use util::{channel::ReleaseChannel, post_inc, ResultExt, TryFutureExt};
22use uuid::Uuid;
23
24pub struct Telemetry {
25 http_client: Arc<dyn HttpClient>,
26 executor: Arc<Background>,
27 state: Mutex<TelemetryState>,
28}
29
30#[derive(Default)]
31struct TelemetryState {
32 metrics_id: Option<Arc<str>>, // Per logged-in user
33 installation_id: Option<Arc<str>>, // Per app installation
34 app_version: Option<Arc<str>>,
35 release_channel: Option<&'static str>,
36 os_version: Option<Arc<str>>,
37 os_name: &'static str,
38 mixpanel_events_queue: Vec<MixpanelEvent>,
39 clickhouse_events_queue: Vec<ClickhouseEventWrapper>,
40 next_mixpanel_event_id: usize,
41 flush_mixpanel_events_task: Option<Task<()>>,
42 flush_clickhouse_events_task: Option<Task<()>>,
43 log_file: Option<NamedTempFile>,
44 is_staff: Option<bool>,
45}
46
47const MIXPANEL_EVENTS_URL: &'static str = "https://api.mixpanel.com/track";
48const MIXPANEL_ENGAGE_URL: &'static str = "https://api.mixpanel.com/engage#profile-set";
49const CLICKHOUSE_EVENTS_URL_PATH: &'static str = "/api/events";
50
51lazy_static! {
52 static ref MIXPANEL_TOKEN: Option<String> = std::env::var("ZED_MIXPANEL_TOKEN")
53 .ok()
54 .or_else(|| option_env!("ZED_MIXPANEL_TOKEN").map(|key| key.to_string()));
55 static ref CLICKHOUSE_EVENTS_URL: String =
56 format!("{}{}", *ZED_SERVER_URL, CLICKHOUSE_EVENTS_URL_PATH);
57}
58
59#[derive(Serialize, Debug)]
60struct ClickhouseEventRequestBody {
61 token: &'static str,
62 installation_id: Option<Arc<str>>,
63 app_version: Option<Arc<str>>,
64 os_name: &'static str,
65 os_version: Option<Arc<str>>,
66 release_channel: Option<&'static str>,
67 events: Vec<ClickhouseEventWrapper>,
68}
69
70#[derive(Serialize, Debug)]
71struct ClickhouseEventWrapper {
72 time: u128,
73 signed_in: bool,
74 #[serde(flatten)]
75 event: ClickhouseEvent,
76}
77
78#[derive(Serialize, Debug)]
79#[serde(tag = "type")]
80pub enum ClickhouseEvent {
81 Editor {
82 operation: &'static str,
83 file_extension: Option<String>,
84 vim_mode: bool,
85 copilot_enabled: bool,
86 copilot_enabled_for_language: bool,
87 },
88}
89
90#[derive(Serialize, Debug)]
91struct MixpanelEvent {
92 event: String,
93 properties: MixpanelEventProperties,
94}
95
96#[derive(Serialize, Debug)]
97struct MixpanelEventProperties {
98 // Mixpanel required fields
99 #[serde(skip_serializing_if = "str::is_empty")]
100 token: &'static str,
101 time: u128,
102 #[serde(rename = "distinct_id")]
103 installation_id: Option<Arc<str>>,
104 #[serde(rename = "$insert_id")]
105 insert_id: usize,
106 // Custom fields
107 #[serde(skip_serializing_if = "Option::is_none", flatten)]
108 event_properties: Option<Map<String, Value>>,
109 #[serde(rename = "OS Name")]
110 os_name: &'static str,
111 #[serde(rename = "OS Version")]
112 os_version: Option<Arc<str>>,
113 #[serde(rename = "Release Channel")]
114 release_channel: Option<&'static str>,
115 #[serde(rename = "App Version")]
116 app_version: Option<Arc<str>>,
117 #[serde(rename = "Signed In")]
118 signed_in: bool,
119}
120
121#[derive(Serialize)]
122struct MixpanelEngageRequest {
123 #[serde(rename = "$token")]
124 token: &'static str,
125 #[serde(rename = "$distinct_id")]
126 installation_id: Arc<str>,
127 #[serde(rename = "$set")]
128 set: Value,
129}
130
131#[cfg(debug_assertions)]
132const MAX_QUEUE_LEN: usize = 1;
133
134#[cfg(not(debug_assertions))]
135const MAX_QUEUE_LEN: usize = 10;
136
137#[cfg(debug_assertions)]
138const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
139
140#[cfg(not(debug_assertions))]
141const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
142
143impl Telemetry {
144 pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
145 let platform = cx.platform();
146 let release_channel = if cx.has_global::<ReleaseChannel>() {
147 Some(cx.global::<ReleaseChannel>().display_name())
148 } else {
149 None
150 };
151 let this = Arc::new(Self {
152 http_client: client,
153 executor: cx.background().clone(),
154 state: Mutex::new(TelemetryState {
155 os_version: platform.os_version().ok().map(|v| v.to_string().into()),
156 os_name: platform.os_name().into(),
157 app_version: platform.app_version().ok().map(|v| v.to_string().into()),
158 release_channel,
159 installation_id: None,
160 metrics_id: None,
161 mixpanel_events_queue: Default::default(),
162 clickhouse_events_queue: Default::default(),
163 flush_mixpanel_events_task: Default::default(),
164 flush_clickhouse_events_task: Default::default(),
165 next_mixpanel_event_id: 0,
166 log_file: None,
167 is_staff: None,
168 }),
169 });
170
171 if MIXPANEL_TOKEN.is_some() {
172 this.executor
173 .spawn({
174 let this = this.clone();
175 async move {
176 if let Some(tempfile) = NamedTempFile::new().log_err() {
177 this.state.lock().log_file = Some(tempfile);
178 }
179 }
180 })
181 .detach();
182 }
183
184 this
185 }
186
187 pub fn log_file_path(&self) -> Option<PathBuf> {
188 Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
189 }
190
191 pub fn start(self: &Arc<Self>) {
192 let this = self.clone();
193 self.executor
194 .spawn(
195 async move {
196 let installation_id =
197 if let Ok(Some(installation_id)) = KEY_VALUE_STORE.read_kvp("device_id") {
198 installation_id
199 } else {
200 let installation_id = Uuid::new_v4().to_string();
201 KEY_VALUE_STORE
202 .write_kvp("device_id".to_string(), installation_id.clone())
203 .await?;
204 installation_id
205 };
206
207 let installation_id: Arc<str> = installation_id.into();
208 let mut state = this.state.lock();
209 state.installation_id = Some(installation_id.clone());
210
211 for event in &mut state.mixpanel_events_queue {
212 event
213 .properties
214 .installation_id
215 .get_or_insert_with(|| installation_id.clone());
216 }
217
218 let has_mixpanel_events = !state.mixpanel_events_queue.is_empty();
219 let has_clickhouse_events = !state.clickhouse_events_queue.is_empty();
220 drop(state);
221
222 if has_mixpanel_events {
223 this.flush_mixpanel_events();
224 }
225
226 if has_clickhouse_events {
227 this.flush_clickhouse_events();
228 }
229
230 anyhow::Ok(())
231 }
232 .log_err(),
233 )
234 .detach();
235 }
236
237 /// This method takes the entire TelemetrySettings struct in order to force client code
238 /// to pull the struct out of the settings global. Do not remove!
239 pub fn set_authenticated_user_info(
240 self: &Arc<Self>,
241 metrics_id: Option<String>,
242 is_staff: bool,
243 cx: &AppContext,
244 ) {
245 if !settings::get_setting::<TelemetrySettings>(None, cx).metrics {
246 return;
247 }
248
249 let this = self.clone();
250 let mut state = self.state.lock();
251 let installation_id = state.installation_id.clone();
252 let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
253 state.metrics_id = metrics_id.clone();
254 state.is_staff = Some(is_staff);
255 drop(state);
256
257 if let Some((token, installation_id)) = MIXPANEL_TOKEN.as_ref().zip(installation_id) {
258 self.executor
259 .spawn(
260 async move {
261 let json_bytes = serde_json::to_vec(&[MixpanelEngageRequest {
262 token,
263 installation_id,
264 set: json!({
265 "Staff": is_staff,
266 "ID": metrics_id,
267 "App": true
268 }),
269 }])?;
270
271 this.http_client
272 .post_json(MIXPANEL_ENGAGE_URL, json_bytes.into())
273 .await?;
274 anyhow::Ok(())
275 }
276 .log_err(),
277 )
278 .detach();
279 }
280 }
281
282 pub fn report_clickhouse_event(
283 self: &Arc<Self>,
284 event: ClickhouseEvent,
285 telemetry_settings: TelemetrySettings,
286 ) {
287 if !telemetry_settings.metrics {
288 return;
289 }
290
291 let mut state = self.state.lock();
292 let signed_in = state.metrics_id.is_some();
293 state.clickhouse_events_queue.push(ClickhouseEventWrapper {
294 time: SystemTime::now()
295 .duration_since(UNIX_EPOCH)
296 .unwrap()
297 .as_millis(),
298 signed_in,
299 event,
300 });
301
302 if state.installation_id.is_some() {
303 if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
304 drop(state);
305 self.flush_clickhouse_events();
306 } else {
307 let this = self.clone();
308 let executor = self.executor.clone();
309 state.flush_clickhouse_events_task = Some(self.executor.spawn(async move {
310 executor.timer(DEBOUNCE_INTERVAL).await;
311 this.flush_clickhouse_events();
312 }));
313 }
314 }
315 }
316
317 pub fn report_mixpanel_event(
318 self: &Arc<Self>,
319 kind: &str,
320 properties: Value,
321 telemetry_settings: TelemetrySettings,
322 ) {
323 if !telemetry_settings.metrics {
324 return;
325 }
326
327 let mut state = self.state.lock();
328 let event = MixpanelEvent {
329 event: kind.into(),
330 properties: MixpanelEventProperties {
331 token: "",
332 time: SystemTime::now()
333 .duration_since(UNIX_EPOCH)
334 .unwrap()
335 .as_millis(),
336 installation_id: state.installation_id.clone(),
337 insert_id: post_inc(&mut state.next_mixpanel_event_id),
338 event_properties: if let Value::Object(properties) = properties {
339 Some(properties)
340 } else {
341 None
342 },
343 os_name: state.os_name,
344 os_version: state.os_version.clone(),
345 release_channel: state.release_channel,
346 app_version: state.app_version.clone(),
347 signed_in: state.metrics_id.is_some(),
348 },
349 };
350 state.mixpanel_events_queue.push(event);
351 if state.installation_id.is_some() {
352 if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
353 drop(state);
354 self.flush_mixpanel_events();
355 } else {
356 let this = self.clone();
357 let executor = self.executor.clone();
358 state.flush_mixpanel_events_task = Some(self.executor.spawn(async move {
359 executor.timer(DEBOUNCE_INTERVAL).await;
360 this.flush_mixpanel_events();
361 }));
362 }
363 }
364 }
365
366 pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
367 self.state.lock().metrics_id.clone()
368 }
369
370 pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
371 self.state.lock().installation_id.clone()
372 }
373
374 pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
375 self.state.lock().is_staff
376 }
377
378 fn flush_mixpanel_events(self: &Arc<Self>) {
379 let mut state = self.state.lock();
380 let mut events = mem::take(&mut state.mixpanel_events_queue);
381 state.flush_mixpanel_events_task.take();
382 drop(state);
383
384 if let Some(token) = MIXPANEL_TOKEN.as_ref() {
385 let this = self.clone();
386 self.executor
387 .spawn(
388 async move {
389 let mut json_bytes = Vec::new();
390
391 if let Some(file) = &mut this.state.lock().log_file {
392 let file = file.as_file_mut();
393 for event in &mut events {
394 json_bytes.clear();
395 serde_json::to_writer(&mut json_bytes, event)?;
396 file.write_all(&json_bytes)?;
397 file.write(b"\n")?;
398
399 event.properties.token = token;
400 }
401 }
402
403 json_bytes.clear();
404 serde_json::to_writer(&mut json_bytes, &events)?;
405 this.http_client
406 .post_json(MIXPANEL_EVENTS_URL, json_bytes.into())
407 .await?;
408 anyhow::Ok(())
409 }
410 .log_err(),
411 )
412 .detach();
413 }
414 }
415
416 fn flush_clickhouse_events(self: &Arc<Self>) {
417 let mut state = self.state.lock();
418 let mut events = mem::take(&mut state.clickhouse_events_queue);
419 state.flush_clickhouse_events_task.take();
420 drop(state);
421
422 let this = self.clone();
423 self.executor
424 .spawn(
425 async move {
426 let mut json_bytes = Vec::new();
427
428 if let Some(file) = &mut this.state.lock().log_file {
429 let file = file.as_file_mut();
430 for event in &mut events {
431 json_bytes.clear();
432 serde_json::to_writer(&mut json_bytes, event)?;
433 file.write_all(&json_bytes)?;
434 file.write(b"\n")?;
435 }
436 }
437
438 {
439 let state = this.state.lock();
440 json_bytes.clear();
441 serde_json::to_writer(
442 &mut json_bytes,
443 &ClickhouseEventRequestBody {
444 token: ZED_SECRET_CLIENT_TOKEN,
445 installation_id: state.installation_id.clone(),
446 app_version: state.app_version.clone(),
447 os_name: state.os_name,
448 os_version: state.os_version.clone(),
449 release_channel: state.release_channel,
450 events,
451 },
452 )?;
453 }
454
455 this.http_client
456 .post_json(CLICKHOUSE_EVENTS_URL.as_str(), json_bytes.into())
457 .await?;
458 anyhow::Ok(())
459 }
460 .log_err(),
461 )
462 .detach();
463 }
464}