1use crate::{ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL};
2use db::kvp::KEY_VALUE_STORE;
3use gpui::{
4 executor::Background,
5 serde_json::{self, value::Map, Value},
6 AppContext, Task,
7};
8use lazy_static::lazy_static;
9use parking_lot::Mutex;
10use serde::Serialize;
11use serde_json::json;
12use settings::TelemetrySettings;
13use std::{
14 io::Write,
15 mem,
16 path::PathBuf,
17 sync::Arc,
18 time::{Duration, SystemTime, UNIX_EPOCH},
19};
20use tempfile::NamedTempFile;
21use util::http::HttpClient;
22use util::{channel::ReleaseChannel, post_inc, ResultExt, TryFutureExt};
23use uuid::Uuid;
24
25pub struct Telemetry {
26 http_client: Arc<dyn HttpClient>,
27 executor: Arc<Background>,
28 state: Mutex<TelemetryState>,
29}
30
31#[derive(Default)]
32struct TelemetryState {
33 metrics_id: Option<Arc<str>>, // Per logged-in user
34 installation_id: Option<Arc<str>>, // Per app installation
35 app_version: Option<Arc<str>>,
36 release_channel: Option<&'static str>,
37 os_version: Option<Arc<str>>,
38 os_name: &'static str,
39 mixpanel_events_queue: Vec<MixpanelEvent>,
40 clickhouse_events_queue: Vec<ClickhouseEventWrapper>,
41 next_mixpanel_event_id: usize,
42 flush_mixpanel_events_task: Option<Task<()>>,
43 flush_clickhouse_events_task: Option<Task<()>>,
44 log_file: Option<NamedTempFile>,
45 is_staff: Option<bool>,
46}
47
48const MIXPANEL_EVENTS_URL: &'static str = "https://api.mixpanel.com/track";
49const MIXPANEL_ENGAGE_URL: &'static str = "https://api.mixpanel.com/engage#profile-set";
50const CLICKHOUSE_EVENTS_URL_PATH: &'static str = "/api/events";
51
52lazy_static! {
53 static ref MIXPANEL_TOKEN: Option<String> = std::env::var("ZED_MIXPANEL_TOKEN")
54 .ok()
55 .or_else(|| option_env!("ZED_MIXPANEL_TOKEN").map(|key| key.to_string()));
56 static ref CLICKHOUSE_EVENTS_URL: String =
57 format!("{}{}", *ZED_SERVER_URL, CLICKHOUSE_EVENTS_URL_PATH);
58}
59
60#[derive(Serialize, Debug)]
61struct ClickhouseEventRequestBody {
62 token: &'static str,
63 installation_id: Option<Arc<str>>,
64 app_version: Option<Arc<str>>,
65 os_name: &'static str,
66 os_version: Option<Arc<str>>,
67 release_channel: Option<&'static str>,
68 events: Vec<ClickhouseEventWrapper>,
69}
70
71#[derive(Serialize, Debug)]
72struct ClickhouseEventWrapper {
73 time: u128,
74 signed_in: bool,
75 #[serde(flatten)]
76 event: ClickhouseEvent,
77}
78
79#[derive(Serialize, Debug)]
80#[serde(tag = "type")]
81pub enum ClickhouseEvent {
82 Editor {
83 operation: &'static str,
84 file_extension: Option<String>,
85 vim_mode: bool,
86 copilot_enabled: bool,
87 copilot_enabled_for_language: bool,
88 },
89}
90
91#[derive(Serialize, Debug)]
92struct MixpanelEvent {
93 event: String,
94 properties: MixpanelEventProperties,
95}
96
97#[derive(Serialize, Debug)]
98struct MixpanelEventProperties {
99 // Mixpanel required fields
100 #[serde(skip_serializing_if = "str::is_empty")]
101 token: &'static str,
102 time: u128,
103 #[serde(rename = "distinct_id")]
104 installation_id: Option<Arc<str>>,
105 #[serde(rename = "$insert_id")]
106 insert_id: usize,
107 // Custom fields
108 #[serde(skip_serializing_if = "Option::is_none", flatten)]
109 event_properties: Option<Map<String, Value>>,
110 #[serde(rename = "OS Name")]
111 os_name: &'static str,
112 #[serde(rename = "OS Version")]
113 os_version: Option<Arc<str>>,
114 #[serde(rename = "Release Channel")]
115 release_channel: Option<&'static str>,
116 #[serde(rename = "App Version")]
117 app_version: Option<Arc<str>>,
118 #[serde(rename = "Signed In")]
119 signed_in: bool,
120}
121
122#[derive(Serialize)]
123struct MixpanelEngageRequest {
124 #[serde(rename = "$token")]
125 token: &'static str,
126 #[serde(rename = "$distinct_id")]
127 installation_id: Arc<str>,
128 #[serde(rename = "$set")]
129 set: Value,
130}
131
132#[cfg(debug_assertions)]
133const MAX_QUEUE_LEN: usize = 1;
134
135#[cfg(not(debug_assertions))]
136const MAX_QUEUE_LEN: usize = 10;
137
138#[cfg(debug_assertions)]
139const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
140
141#[cfg(not(debug_assertions))]
142const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
143
144impl Telemetry {
145 pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
146 let platform = cx.platform();
147 let release_channel = if cx.has_global::<ReleaseChannel>() {
148 Some(cx.global::<ReleaseChannel>().display_name())
149 } else {
150 None
151 };
152 let this = Arc::new(Self {
153 http_client: client,
154 executor: cx.background().clone(),
155 state: Mutex::new(TelemetryState {
156 os_version: platform.os_version().ok().map(|v| v.to_string().into()),
157 os_name: platform.os_name().into(),
158 app_version: platform.app_version().ok().map(|v| v.to_string().into()),
159 release_channel,
160 installation_id: None,
161 metrics_id: None,
162 mixpanel_events_queue: Default::default(),
163 clickhouse_events_queue: Default::default(),
164 flush_mixpanel_events_task: Default::default(),
165 flush_clickhouse_events_task: Default::default(),
166 next_mixpanel_event_id: 0,
167 log_file: None,
168 is_staff: None,
169 }),
170 });
171
172 if MIXPANEL_TOKEN.is_some() {
173 this.executor
174 .spawn({
175 let this = this.clone();
176 async move {
177 if let Some(tempfile) = NamedTempFile::new().log_err() {
178 this.state.lock().log_file = Some(tempfile);
179 }
180 }
181 })
182 .detach();
183 }
184
185 this
186 }
187
188 pub fn log_file_path(&self) -> Option<PathBuf> {
189 Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
190 }
191
192 pub fn start(self: &Arc<Self>) {
193 let this = self.clone();
194 self.executor
195 .spawn(
196 async move {
197 let installation_id =
198 if let Ok(Some(installation_id)) = KEY_VALUE_STORE.read_kvp("device_id") {
199 installation_id
200 } else {
201 let installation_id = Uuid::new_v4().to_string();
202 KEY_VALUE_STORE
203 .write_kvp("device_id".to_string(), installation_id.clone())
204 .await?;
205 installation_id
206 };
207
208 let installation_id: Arc<str> = installation_id.into();
209 let mut state = this.state.lock();
210 state.installation_id = Some(installation_id.clone());
211
212 for event in &mut state.mixpanel_events_queue {
213 event
214 .properties
215 .installation_id
216 .get_or_insert_with(|| installation_id.clone());
217 }
218
219 let has_mixpanel_events = !state.mixpanel_events_queue.is_empty();
220 let has_clickhouse_events = !state.clickhouse_events_queue.is_empty();
221 drop(state);
222
223 if has_mixpanel_events {
224 this.flush_mixpanel_events();
225 }
226
227 if has_clickhouse_events {
228 this.flush_clickhouse_events();
229 }
230
231 anyhow::Ok(())
232 }
233 .log_err(),
234 )
235 .detach();
236 }
237
238 /// This method takes the entire TelemetrySettings struct in order to force client code
239 /// to pull the struct out of the settings global. Do not remove!
240 pub fn set_authenticated_user_info(
241 self: &Arc<Self>,
242 metrics_id: Option<String>,
243 is_staff: bool,
244 telemetry_settings: TelemetrySettings,
245 ) {
246 if !telemetry_settings.metrics() {
247 return;
248 }
249
250 let this = self.clone();
251 let mut state = self.state.lock();
252 let installation_id = state.installation_id.clone();
253 let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
254 state.metrics_id = metrics_id.clone();
255 state.is_staff = Some(is_staff);
256 drop(state);
257
258 if let Some((token, installation_id)) = MIXPANEL_TOKEN.as_ref().zip(installation_id) {
259 self.executor
260 .spawn(
261 async move {
262 let json_bytes = serde_json::to_vec(&[MixpanelEngageRequest {
263 token,
264 installation_id,
265 set: json!({
266 "Staff": is_staff,
267 "ID": metrics_id,
268 "App": true
269 }),
270 }])?;
271
272 this.http_client
273 .post_json(MIXPANEL_ENGAGE_URL, json_bytes.into())
274 .await?;
275 anyhow::Ok(())
276 }
277 .log_err(),
278 )
279 .detach();
280 }
281 }
282
283 pub fn report_clickhouse_event(
284 self: &Arc<Self>,
285 event: ClickhouseEvent,
286 telemetry_settings: TelemetrySettings,
287 ) {
288 if !telemetry_settings.metrics() {
289 return;
290 }
291
292 let mut state = self.state.lock();
293 let signed_in = state.metrics_id.is_some();
294 state.clickhouse_events_queue.push(ClickhouseEventWrapper {
295 time: SystemTime::now()
296 .duration_since(UNIX_EPOCH)
297 .unwrap()
298 .as_millis(),
299 signed_in,
300 event,
301 });
302
303 if state.installation_id.is_some() {
304 if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
305 drop(state);
306 self.flush_clickhouse_events();
307 } else {
308 let this = self.clone();
309 let executor = self.executor.clone();
310 state.flush_clickhouse_events_task = Some(self.executor.spawn(async move {
311 executor.timer(DEBOUNCE_INTERVAL).await;
312 this.flush_clickhouse_events();
313 }));
314 }
315 }
316 }
317
318 pub fn report_mixpanel_event(
319 self: &Arc<Self>,
320 kind: &str,
321 properties: Value,
322 telemetry_settings: TelemetrySettings,
323 ) {
324 if !telemetry_settings.metrics() {
325 return;
326 }
327
328 let mut state = self.state.lock();
329 let event = MixpanelEvent {
330 event: kind.into(),
331 properties: MixpanelEventProperties {
332 token: "",
333 time: SystemTime::now()
334 .duration_since(UNIX_EPOCH)
335 .unwrap()
336 .as_millis(),
337 installation_id: state.installation_id.clone(),
338 insert_id: post_inc(&mut state.next_mixpanel_event_id),
339 event_properties: if let Value::Object(properties) = properties {
340 Some(properties)
341 } else {
342 None
343 },
344 os_name: state.os_name,
345 os_version: state.os_version.clone(),
346 release_channel: state.release_channel,
347 app_version: state.app_version.clone(),
348 signed_in: state.metrics_id.is_some(),
349 },
350 };
351 state.mixpanel_events_queue.push(event);
352 if state.installation_id.is_some() {
353 if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
354 drop(state);
355 self.flush_mixpanel_events();
356 } else {
357 let this = self.clone();
358 let executor = self.executor.clone();
359 state.flush_mixpanel_events_task = Some(self.executor.spawn(async move {
360 executor.timer(DEBOUNCE_INTERVAL).await;
361 this.flush_mixpanel_events();
362 }));
363 }
364 }
365 }
366
367 pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
368 self.state.lock().metrics_id.clone()
369 }
370
371 pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
372 self.state.lock().installation_id.clone()
373 }
374
375 pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
376 self.state.lock().is_staff
377 }
378
379 fn flush_mixpanel_events(self: &Arc<Self>) {
380 let mut state = self.state.lock();
381 let mut events = mem::take(&mut state.mixpanel_events_queue);
382 state.flush_mixpanel_events_task.take();
383 drop(state);
384
385 if let Some(token) = MIXPANEL_TOKEN.as_ref() {
386 let this = self.clone();
387 self.executor
388 .spawn(
389 async move {
390 let mut json_bytes = Vec::new();
391
392 if let Some(file) = &mut this.state.lock().log_file {
393 let file = file.as_file_mut();
394 for event in &mut events {
395 json_bytes.clear();
396 serde_json::to_writer(&mut json_bytes, event)?;
397 file.write_all(&json_bytes)?;
398 file.write(b"\n")?;
399
400 event.properties.token = token;
401 }
402 }
403
404 json_bytes.clear();
405 serde_json::to_writer(&mut json_bytes, &events)?;
406 this.http_client
407 .post_json(MIXPANEL_EVENTS_URL, json_bytes.into())
408 .await?;
409 anyhow::Ok(())
410 }
411 .log_err(),
412 )
413 .detach();
414 }
415 }
416
417 fn flush_clickhouse_events(self: &Arc<Self>) {
418 let mut state = self.state.lock();
419 let mut events = mem::take(&mut state.clickhouse_events_queue);
420 state.flush_clickhouse_events_task.take();
421 drop(state);
422
423 let this = self.clone();
424 self.executor
425 .spawn(
426 async move {
427 let mut json_bytes = Vec::new();
428
429 if let Some(file) = &mut this.state.lock().log_file {
430 let file = file.as_file_mut();
431 for event in &mut events {
432 json_bytes.clear();
433 serde_json::to_writer(&mut json_bytes, event)?;
434 file.write_all(&json_bytes)?;
435 file.write(b"\n")?;
436 }
437 }
438
439 {
440 let state = this.state.lock();
441 json_bytes.clear();
442 serde_json::to_writer(
443 &mut json_bytes,
444 &ClickhouseEventRequestBody {
445 token: ZED_SECRET_CLIENT_TOKEN,
446 installation_id: state.installation_id.clone(),
447 app_version: state.app_version.clone(),
448 os_name: state.os_name,
449 os_version: state.os_version.clone(),
450 release_channel: state.release_channel,
451 events,
452 },
453 )?;
454 }
455
456 this.http_client
457 .post_json(CLICKHOUSE_EVENTS_URL.as_str(), json_bytes.into())
458 .await?;
459 anyhow::Ok(())
460 }
461 .log_err(),
462 )
463 .detach();
464 }
465}