telemetry.rs

  1use crate::{TelemetrySettings, ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL};
  2use db::kvp::KEY_VALUE_STORE;
  3use gpui::{
  4    executor::Background,
  5    serde_json::{self, value::Map, Value},
  6    AppContext, Task,
  7};
  8use lazy_static::lazy_static;
  9use parking_lot::Mutex;
 10use serde::Serialize;
 11use serde_json::json;
 12use std::{
 13    env,
 14    io::Write,
 15    mem,
 16    path::PathBuf,
 17    sync::Arc,
 18    time::{Duration, SystemTime, UNIX_EPOCH},
 19};
 20use tempfile::NamedTempFile;
 21use util::http::HttpClient;
 22use util::{channel::ReleaseChannel, post_inc, ResultExt, TryFutureExt};
 23use uuid::Uuid;
 24
 25pub struct Telemetry {
 26    http_client: Arc<dyn HttpClient>,
 27    executor: Arc<Background>,
 28    state: Mutex<TelemetryState>,
 29}
 30
 31#[derive(Default)]
 32struct TelemetryState {
 33    metrics_id: Option<Arc<str>>,      // Per logged-in user
 34    installation_id: Option<Arc<str>>, // Per app installation
 35    app_version: Option<Arc<str>>,
 36    release_channel: Option<&'static str>,
 37    os_name: &'static str,
 38    os_version: Option<Arc<str>>,
 39    architecture: &'static str,
 40    mixpanel_events_queue: Vec<MixpanelEvent>,
 41    clickhouse_events_queue: Vec<ClickhouseEventWrapper>,
 42    next_mixpanel_event_id: usize,
 43    flush_mixpanel_events_task: Option<Task<()>>,
 44    flush_clickhouse_events_task: Option<Task<()>>,
 45    log_file: Option<NamedTempFile>,
 46    is_staff: Option<bool>,
 47}
 48
 49const MIXPANEL_EVENTS_URL: &'static str = "https://api.mixpanel.com/track";
 50const MIXPANEL_ENGAGE_URL: &'static str = "https://api.mixpanel.com/engage#profile-set";
 51const CLICKHOUSE_EVENTS_URL_PATH: &'static str = "/api/events";
 52
 53lazy_static! {
 54    static ref MIXPANEL_TOKEN: Option<String> = std::env::var("ZED_MIXPANEL_TOKEN")
 55        .ok()
 56        .or_else(|| option_env!("ZED_MIXPANEL_TOKEN").map(|key| key.to_string()));
 57    static ref CLICKHOUSE_EVENTS_URL: String =
 58        format!("{}{}", *ZED_SERVER_URL, CLICKHOUSE_EVENTS_URL_PATH);
 59}
 60
 61#[derive(Serialize, Debug)]
 62struct ClickhouseEventRequestBody {
 63    token: &'static str,
 64    installation_id: Option<Arc<str>>,
 65    app_version: Option<Arc<str>>,
 66    os_name: &'static str,
 67    os_version: Option<Arc<str>>,
 68    architecture: &'static str,
 69    release_channel: Option<&'static str>,
 70    events: Vec<ClickhouseEventWrapper>,
 71}
 72
 73#[derive(Serialize, Debug)]
 74struct ClickhouseEventWrapper {
 75    time: u128,
 76    signed_in: bool,
 77    #[serde(flatten)]
 78    event: ClickhouseEvent,
 79}
 80
 81#[derive(Serialize, Debug)]
 82#[serde(tag = "type")]
 83pub enum ClickhouseEvent {
 84    Editor {
 85        operation: &'static str,
 86        file_extension: Option<String>,
 87        vim_mode: bool,
 88        copilot_enabled: bool,
 89        copilot_enabled_for_language: bool,
 90    },
 91    Copilot {
 92        suggestion_id: Option<String>,
 93        suggestion_accepted: bool,
 94        file_extension: Option<String>,
 95    },
 96}
 97
 98#[derive(Serialize, Debug)]
 99struct MixpanelEvent {
100    event: String,
101    properties: MixpanelEventProperties,
102}
103
104#[derive(Serialize, Debug)]
105struct MixpanelEventProperties {
106    // Mixpanel required fields
107    #[serde(skip_serializing_if = "str::is_empty")]
108    token: &'static str,
109    time: u128,
110    #[serde(rename = "distinct_id")]
111    installation_id: Option<Arc<str>>,
112    #[serde(rename = "$insert_id")]
113    insert_id: usize,
114    // Custom fields
115    #[serde(skip_serializing_if = "Option::is_none", flatten)]
116    event_properties: Option<Map<String, Value>>,
117    #[serde(rename = "OS Name")]
118    os_name: &'static str,
119    #[serde(rename = "OS Version")]
120    os_version: Option<Arc<str>>,
121    #[serde(rename = "Release Channel")]
122    release_channel: Option<&'static str>,
123    #[serde(rename = "App Version")]
124    app_version: Option<Arc<str>>,
125    #[serde(rename = "Signed In")]
126    signed_in: bool,
127}
128
129#[derive(Serialize)]
130struct MixpanelEngageRequest {
131    #[serde(rename = "$token")]
132    token: &'static str,
133    #[serde(rename = "$distinct_id")]
134    installation_id: Arc<str>,
135    #[serde(rename = "$set")]
136    set: Value,
137}
138
139#[cfg(debug_assertions)]
140const MAX_QUEUE_LEN: usize = 1;
141
142#[cfg(not(debug_assertions))]
143const MAX_QUEUE_LEN: usize = 10;
144
145#[cfg(debug_assertions)]
146const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
147
148#[cfg(not(debug_assertions))]
149const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
150
151impl Telemetry {
152    pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
153        let platform = cx.platform();
154        let release_channel = if cx.has_global::<ReleaseChannel>() {
155            Some(cx.global::<ReleaseChannel>().display_name())
156        } else {
157            None
158        };
159        // TODO: Replace all hardware stuff with nested SystemSpecs json
160        let this = Arc::new(Self {
161            http_client: client,
162            executor: cx.background().clone(),
163            state: Mutex::new(TelemetryState {
164                os_name: platform.os_name().into(),
165                os_version: platform.os_version().ok().map(|v| v.to_string().into()),
166                architecture: env::consts::ARCH,
167                app_version: platform.app_version().ok().map(|v| v.to_string().into()),
168                release_channel,
169                installation_id: None,
170                metrics_id: None,
171                mixpanel_events_queue: Default::default(),
172                clickhouse_events_queue: Default::default(),
173                flush_mixpanel_events_task: Default::default(),
174                flush_clickhouse_events_task: Default::default(),
175                next_mixpanel_event_id: 0,
176                log_file: None,
177                is_staff: None,
178            }),
179        });
180
181        if MIXPANEL_TOKEN.is_some() {
182            this.executor
183                .spawn({
184                    let this = this.clone();
185                    async move {
186                        if let Some(tempfile) = NamedTempFile::new().log_err() {
187                            this.state.lock().log_file = Some(tempfile);
188                        }
189                    }
190                })
191                .detach();
192        }
193
194        this
195    }
196
197    pub fn log_file_path(&self) -> Option<PathBuf> {
198        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
199    }
200
201    pub fn start(self: &Arc<Self>) {
202        let this = self.clone();
203        self.executor
204            .spawn(
205                async move {
206                    let installation_id =
207                        if let Ok(Some(installation_id)) = KEY_VALUE_STORE.read_kvp("device_id") {
208                            installation_id
209                        } else {
210                            let installation_id = Uuid::new_v4().to_string();
211                            KEY_VALUE_STORE
212                                .write_kvp("device_id".to_string(), installation_id.clone())
213                                .await?;
214                            installation_id
215                        };
216
217                    let installation_id: Arc<str> = installation_id.into();
218                    let mut state = this.state.lock();
219                    state.installation_id = Some(installation_id.clone());
220
221                    for event in &mut state.mixpanel_events_queue {
222                        event
223                            .properties
224                            .installation_id
225                            .get_or_insert_with(|| installation_id.clone());
226                    }
227
228                    let has_mixpanel_events = !state.mixpanel_events_queue.is_empty();
229                    let has_clickhouse_events = !state.clickhouse_events_queue.is_empty();
230                    drop(state);
231
232                    if has_mixpanel_events {
233                        this.flush_mixpanel_events();
234                    }
235
236                    if has_clickhouse_events {
237                        this.flush_clickhouse_events();
238                    }
239
240                    anyhow::Ok(())
241                }
242                .log_err(),
243            )
244            .detach();
245    }
246
247    /// This method takes the entire TelemetrySettings struct in order to force client code
248    /// to pull the struct out of the settings global. Do not remove!
249    pub fn set_authenticated_user_info(
250        self: &Arc<Self>,
251        metrics_id: Option<String>,
252        is_staff: bool,
253        cx: &AppContext,
254    ) {
255        if !settings::get::<TelemetrySettings>(cx).metrics {
256            return;
257        }
258
259        let this = self.clone();
260        let mut state = self.state.lock();
261        let installation_id = state.installation_id.clone();
262        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
263        state.metrics_id = metrics_id.clone();
264        state.is_staff = Some(is_staff);
265        drop(state);
266
267        if let Some((token, installation_id)) = MIXPANEL_TOKEN.as_ref().zip(installation_id) {
268            self.executor
269                .spawn(
270                    async move {
271                        let json_bytes = serde_json::to_vec(&[MixpanelEngageRequest {
272                            token,
273                            installation_id,
274                            set: json!({
275                                "Staff": is_staff,
276                                "ID": metrics_id,
277                                "App": true
278                            }),
279                        }])?;
280
281                        this.http_client
282                            .post_json(MIXPANEL_ENGAGE_URL, json_bytes.into())
283                            .await?;
284                        anyhow::Ok(())
285                    }
286                    .log_err(),
287                )
288                .detach();
289        }
290    }
291
292    pub fn report_clickhouse_event(
293        self: &Arc<Self>,
294        event: ClickhouseEvent,
295        telemetry_settings: TelemetrySettings,
296    ) {
297        if !telemetry_settings.metrics {
298            return;
299        }
300
301        let mut state = self.state.lock();
302        let signed_in = state.metrics_id.is_some();
303        state.clickhouse_events_queue.push(ClickhouseEventWrapper {
304            time: SystemTime::now()
305                .duration_since(UNIX_EPOCH)
306                .unwrap()
307                .as_millis(),
308            signed_in,
309            event,
310        });
311
312        if state.installation_id.is_some() {
313            if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
314                drop(state);
315                self.flush_clickhouse_events();
316            } else {
317                let this = self.clone();
318                let executor = self.executor.clone();
319                state.flush_clickhouse_events_task = Some(self.executor.spawn(async move {
320                    executor.timer(DEBOUNCE_INTERVAL).await;
321                    this.flush_clickhouse_events();
322                }));
323            }
324        }
325    }
326
327    pub fn report_mixpanel_event(
328        self: &Arc<Self>,
329        kind: &str,
330        properties: Value,
331        telemetry_settings: TelemetrySettings,
332    ) {
333        if !telemetry_settings.metrics {
334            return;
335        }
336
337        let mut state = self.state.lock();
338        let event = MixpanelEvent {
339            event: kind.into(),
340            properties: MixpanelEventProperties {
341                token: "",
342                time: SystemTime::now()
343                    .duration_since(UNIX_EPOCH)
344                    .unwrap()
345                    .as_millis(),
346                installation_id: state.installation_id.clone(),
347                insert_id: post_inc(&mut state.next_mixpanel_event_id),
348                event_properties: if let Value::Object(properties) = properties {
349                    Some(properties)
350                } else {
351                    None
352                },
353                os_name: state.os_name,
354                os_version: state.os_version.clone(),
355                release_channel: state.release_channel,
356                app_version: state.app_version.clone(),
357                signed_in: state.metrics_id.is_some(),
358            },
359        };
360        state.mixpanel_events_queue.push(event);
361        if state.installation_id.is_some() {
362            if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
363                drop(state);
364                self.flush_mixpanel_events();
365            } else {
366                let this = self.clone();
367                let executor = self.executor.clone();
368                state.flush_mixpanel_events_task = Some(self.executor.spawn(async move {
369                    executor.timer(DEBOUNCE_INTERVAL).await;
370                    this.flush_mixpanel_events();
371                }));
372            }
373        }
374    }
375
376    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
377        self.state.lock().metrics_id.clone()
378    }
379
380    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
381        self.state.lock().installation_id.clone()
382    }
383
384    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
385        self.state.lock().is_staff
386    }
387
388    fn flush_mixpanel_events(self: &Arc<Self>) {
389        let mut state = self.state.lock();
390        let mut events = mem::take(&mut state.mixpanel_events_queue);
391        state.flush_mixpanel_events_task.take();
392        drop(state);
393
394        if let Some(token) = MIXPANEL_TOKEN.as_ref() {
395            let this = self.clone();
396            self.executor
397                .spawn(
398                    async move {
399                        let mut json_bytes = Vec::new();
400
401                        if let Some(file) = &mut this.state.lock().log_file {
402                            let file = file.as_file_mut();
403                            for event in &mut events {
404                                json_bytes.clear();
405                                serde_json::to_writer(&mut json_bytes, event)?;
406                                file.write_all(&json_bytes)?;
407                                file.write(b"\n")?;
408
409                                event.properties.token = token;
410                            }
411                        }
412
413                        json_bytes.clear();
414                        serde_json::to_writer(&mut json_bytes, &events)?;
415                        this.http_client
416                            .post_json(MIXPANEL_EVENTS_URL, json_bytes.into())
417                            .await?;
418                        anyhow::Ok(())
419                    }
420                    .log_err(),
421                )
422                .detach();
423        }
424    }
425
426    fn flush_clickhouse_events(self: &Arc<Self>) {
427        let mut state = self.state.lock();
428        let mut events = mem::take(&mut state.clickhouse_events_queue);
429        state.flush_clickhouse_events_task.take();
430        drop(state);
431
432        let this = self.clone();
433        self.executor
434            .spawn(
435                async move {
436                    let mut json_bytes = Vec::new();
437
438                    if let Some(file) = &mut this.state.lock().log_file {
439                        let file = file.as_file_mut();
440                        for event in &mut events {
441                            json_bytes.clear();
442                            serde_json::to_writer(&mut json_bytes, event)?;
443                            file.write_all(&json_bytes)?;
444                            file.write(b"\n")?;
445                        }
446                    }
447
448                    {
449                        let state = this.state.lock();
450                        json_bytes.clear();
451                        serde_json::to_writer(
452                            &mut json_bytes,
453                            &ClickhouseEventRequestBody {
454                                token: ZED_SECRET_CLIENT_TOKEN,
455                                installation_id: state.installation_id.clone(),
456                                app_version: state.app_version.clone(),
457                                os_name: state.os_name,
458                                os_version: state.os_version.clone(),
459                                architecture: state.architecture,
460
461                                release_channel: state.release_channel,
462                                events,
463                            },
464                        )?;
465                    }
466
467                    this.http_client
468                        .post_json(CLICKHOUSE_EVENTS_URL.as_str(), json_bytes.into())
469                        .await?;
470                    anyhow::Ok(())
471                }
472                .log_err(),
473            )
474            .detach();
475    }
476}