telemetry.rs

  1use crate::{TelemetrySettings, ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL};
  2use db::kvp::KEY_VALUE_STORE;
  3use gpui::{
  4    executor::Background,
  5    serde_json::{self, value::Map, Value},
  6    AppContext, Task,
  7};
  8use lazy_static::lazy_static;
  9use parking_lot::Mutex;
 10use serde::Serialize;
 11use serde_json::json;
 12use std::{
 13    io::Write,
 14    mem,
 15    path::PathBuf,
 16    sync::Arc,
 17    time::{Duration, SystemTime, UNIX_EPOCH},
 18};
 19use tempfile::NamedTempFile;
 20use util::http::HttpClient;
 21use util::{channel::ReleaseChannel, post_inc, ResultExt, TryFutureExt};
 22use uuid::Uuid;
 23
 24pub struct Telemetry {
 25    http_client: Arc<dyn HttpClient>,
 26    executor: Arc<Background>,
 27    state: Mutex<TelemetryState>,
 28}
 29
 30#[derive(Default)]
 31struct TelemetryState {
 32    metrics_id: Option<Arc<str>>,      // Per logged-in user
 33    installation_id: Option<Arc<str>>, // Per app installation
 34    app_version: Option<Arc<str>>,
 35    release_channel: Option<&'static str>,
 36    os_version: Option<Arc<str>>,
 37    os_name: &'static str,
 38    mixpanel_events_queue: Vec<MixpanelEvent>,
 39    clickhouse_events_queue: Vec<ClickhouseEventWrapper>,
 40    next_mixpanel_event_id: usize,
 41    flush_mixpanel_events_task: Option<Task<()>>,
 42    flush_clickhouse_events_task: Option<Task<()>>,
 43    log_file: Option<NamedTempFile>,
 44    is_staff: Option<bool>,
 45}
 46
 47const MIXPANEL_EVENTS_URL: &'static str = "https://api.mixpanel.com/track";
 48const MIXPANEL_ENGAGE_URL: &'static str = "https://api.mixpanel.com/engage#profile-set";
 49const CLICKHOUSE_EVENTS_URL_PATH: &'static str = "/api/events";
 50
 51lazy_static! {
 52    static ref MIXPANEL_TOKEN: Option<String> = std::env::var("ZED_MIXPANEL_TOKEN")
 53        .ok()
 54        .or_else(|| option_env!("ZED_MIXPANEL_TOKEN").map(|key| key.to_string()));
 55    static ref CLICKHOUSE_EVENTS_URL: String =
 56        format!("{}{}", *ZED_SERVER_URL, CLICKHOUSE_EVENTS_URL_PATH);
 57}
 58
 59#[derive(Serialize, Debug)]
 60struct ClickhouseEventRequestBody {
 61    token: &'static str,
 62    installation_id: Option<Arc<str>>,
 63    app_version: Option<Arc<str>>,
 64    os_name: &'static str,
 65    os_version: Option<Arc<str>>,
 66    release_channel: Option<&'static str>,
 67    events: Vec<ClickhouseEventWrapper>,
 68}
 69
 70#[derive(Serialize, Debug)]
 71struct ClickhouseEventWrapper {
 72    time: u128,
 73    signed_in: bool,
 74    #[serde(flatten)]
 75    event: ClickhouseEvent,
 76}
 77
 78#[derive(Serialize, Debug)]
 79#[serde(tag = "type")]
 80pub enum ClickhouseEvent {
 81    Editor {
 82        operation: &'static str,
 83        file_extension: Option<String>,
 84        vim_mode: bool,
 85        copilot_enabled: bool,
 86        copilot_enabled_for_language: bool,
 87    },
 88    Copilot {
 89        suggestion_id: Option<String>,
 90        suggestion_accepted: bool,
 91        file_extension: Option<String>,
 92    },
 93}
 94
 95#[derive(Serialize, Debug)]
 96struct MixpanelEvent {
 97    event: String,
 98    properties: MixpanelEventProperties,
 99}
100
101#[derive(Serialize, Debug)]
102struct MixpanelEventProperties {
103    // Mixpanel required fields
104    #[serde(skip_serializing_if = "str::is_empty")]
105    token: &'static str,
106    time: u128,
107    #[serde(rename = "distinct_id")]
108    installation_id: Option<Arc<str>>,
109    #[serde(rename = "$insert_id")]
110    insert_id: usize,
111    // Custom fields
112    #[serde(skip_serializing_if = "Option::is_none", flatten)]
113    event_properties: Option<Map<String, Value>>,
114    #[serde(rename = "OS Name")]
115    os_name: &'static str,
116    #[serde(rename = "OS Version")]
117    os_version: Option<Arc<str>>,
118    #[serde(rename = "Release Channel")]
119    release_channel: Option<&'static str>,
120    #[serde(rename = "App Version")]
121    app_version: Option<Arc<str>>,
122    #[serde(rename = "Signed In")]
123    signed_in: bool,
124}
125
126#[derive(Serialize)]
127struct MixpanelEngageRequest {
128    #[serde(rename = "$token")]
129    token: &'static str,
130    #[serde(rename = "$distinct_id")]
131    installation_id: Arc<str>,
132    #[serde(rename = "$set")]
133    set: Value,
134}
135
136#[cfg(debug_assertions)]
137const MAX_QUEUE_LEN: usize = 1;
138
139#[cfg(not(debug_assertions))]
140const MAX_QUEUE_LEN: usize = 10;
141
142#[cfg(debug_assertions)]
143const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
144
145#[cfg(not(debug_assertions))]
146const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
147
148impl Telemetry {
149    pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
150        let platform = cx.platform();
151        let release_channel = if cx.has_global::<ReleaseChannel>() {
152            Some(cx.global::<ReleaseChannel>().display_name())
153        } else {
154            None
155        };
156        let this = Arc::new(Self {
157            http_client: client,
158            executor: cx.background().clone(),
159            state: Mutex::new(TelemetryState {
160                os_version: platform.os_version().ok().map(|v| v.to_string().into()),
161                os_name: platform.os_name().into(),
162                app_version: platform.app_version().ok().map(|v| v.to_string().into()),
163                release_channel,
164                installation_id: None,
165                metrics_id: None,
166                mixpanel_events_queue: Default::default(),
167                clickhouse_events_queue: Default::default(),
168                flush_mixpanel_events_task: Default::default(),
169                flush_clickhouse_events_task: Default::default(),
170                next_mixpanel_event_id: 0,
171                log_file: None,
172                is_staff: None,
173            }),
174        });
175
176        if MIXPANEL_TOKEN.is_some() {
177            this.executor
178                .spawn({
179                    let this = this.clone();
180                    async move {
181                        if let Some(tempfile) = NamedTempFile::new().log_err() {
182                            this.state.lock().log_file = Some(tempfile);
183                        }
184                    }
185                })
186                .detach();
187        }
188
189        this
190    }
191
192    pub fn log_file_path(&self) -> Option<PathBuf> {
193        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
194    }
195
196    pub fn start(self: &Arc<Self>) {
197        let this = self.clone();
198        self.executor
199            .spawn(
200                async move {
201                    let installation_id =
202                        if let Ok(Some(installation_id)) = KEY_VALUE_STORE.read_kvp("device_id") {
203                            installation_id
204                        } else {
205                            let installation_id = Uuid::new_v4().to_string();
206                            KEY_VALUE_STORE
207                                .write_kvp("device_id".to_string(), installation_id.clone())
208                                .await?;
209                            installation_id
210                        };
211
212                    let installation_id: Arc<str> = installation_id.into();
213                    let mut state = this.state.lock();
214                    state.installation_id = Some(installation_id.clone());
215
216                    for event in &mut state.mixpanel_events_queue {
217                        event
218                            .properties
219                            .installation_id
220                            .get_or_insert_with(|| installation_id.clone());
221                    }
222
223                    let has_mixpanel_events = !state.mixpanel_events_queue.is_empty();
224                    let has_clickhouse_events = !state.clickhouse_events_queue.is_empty();
225                    drop(state);
226
227                    if has_mixpanel_events {
228                        this.flush_mixpanel_events();
229                    }
230
231                    if has_clickhouse_events {
232                        this.flush_clickhouse_events();
233                    }
234
235                    anyhow::Ok(())
236                }
237                .log_err(),
238            )
239            .detach();
240    }
241
242    /// This method takes the entire TelemetrySettings struct in order to force client code
243    /// to pull the struct out of the settings global. Do not remove!
244    pub fn set_authenticated_user_info(
245        self: &Arc<Self>,
246        metrics_id: Option<String>,
247        is_staff: bool,
248        cx: &AppContext,
249    ) {
250        if !settings::get::<TelemetrySettings>(cx).metrics {
251            return;
252        }
253
254        let this = self.clone();
255        let mut state = self.state.lock();
256        let installation_id = state.installation_id.clone();
257        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
258        state.metrics_id = metrics_id.clone();
259        state.is_staff = Some(is_staff);
260        drop(state);
261
262        if let Some((token, installation_id)) = MIXPANEL_TOKEN.as_ref().zip(installation_id) {
263            self.executor
264                .spawn(
265                    async move {
266                        let json_bytes = serde_json::to_vec(&[MixpanelEngageRequest {
267                            token,
268                            installation_id,
269                            set: json!({
270                                "Staff": is_staff,
271                                "ID": metrics_id,
272                                "App": true
273                            }),
274                        }])?;
275
276                        this.http_client
277                            .post_json(MIXPANEL_ENGAGE_URL, json_bytes.into())
278                            .await?;
279                        anyhow::Ok(())
280                    }
281                    .log_err(),
282                )
283                .detach();
284        }
285    }
286
287    pub fn report_clickhouse_event(
288        self: &Arc<Self>,
289        event: ClickhouseEvent,
290        telemetry_settings: TelemetrySettings,
291    ) {
292        if !telemetry_settings.metrics {
293            return;
294        }
295
296        let mut state = self.state.lock();
297        let signed_in = state.metrics_id.is_some();
298        state.clickhouse_events_queue.push(ClickhouseEventWrapper {
299            time: SystemTime::now()
300                .duration_since(UNIX_EPOCH)
301                .unwrap()
302                .as_millis(),
303            signed_in,
304            event,
305        });
306
307        if state.installation_id.is_some() {
308            if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
309                drop(state);
310                self.flush_clickhouse_events();
311            } else {
312                let this = self.clone();
313                let executor = self.executor.clone();
314                state.flush_clickhouse_events_task = Some(self.executor.spawn(async move {
315                    executor.timer(DEBOUNCE_INTERVAL).await;
316                    this.flush_clickhouse_events();
317                }));
318            }
319        }
320    }
321
322    pub fn report_mixpanel_event(
323        self: &Arc<Self>,
324        kind: &str,
325        properties: Value,
326        telemetry_settings: TelemetrySettings,
327    ) {
328        if !telemetry_settings.metrics {
329            return;
330        }
331
332        let mut state = self.state.lock();
333        let event = MixpanelEvent {
334            event: kind.into(),
335            properties: MixpanelEventProperties {
336                token: "",
337                time: SystemTime::now()
338                    .duration_since(UNIX_EPOCH)
339                    .unwrap()
340                    .as_millis(),
341                installation_id: state.installation_id.clone(),
342                insert_id: post_inc(&mut state.next_mixpanel_event_id),
343                event_properties: if let Value::Object(properties) = properties {
344                    Some(properties)
345                } else {
346                    None
347                },
348                os_name: state.os_name,
349                os_version: state.os_version.clone(),
350                release_channel: state.release_channel,
351                app_version: state.app_version.clone(),
352                signed_in: state.metrics_id.is_some(),
353            },
354        };
355        state.mixpanel_events_queue.push(event);
356        if state.installation_id.is_some() {
357            if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
358                drop(state);
359                self.flush_mixpanel_events();
360            } else {
361                let this = self.clone();
362                let executor = self.executor.clone();
363                state.flush_mixpanel_events_task = Some(self.executor.spawn(async move {
364                    executor.timer(DEBOUNCE_INTERVAL).await;
365                    this.flush_mixpanel_events();
366                }));
367            }
368        }
369    }
370
371    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
372        self.state.lock().metrics_id.clone()
373    }
374
375    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
376        self.state.lock().installation_id.clone()
377    }
378
379    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
380        self.state.lock().is_staff
381    }
382
383    fn flush_mixpanel_events(self: &Arc<Self>) {
384        let mut state = self.state.lock();
385        let mut events = mem::take(&mut state.mixpanel_events_queue);
386        state.flush_mixpanel_events_task.take();
387        drop(state);
388
389        if let Some(token) = MIXPANEL_TOKEN.as_ref() {
390            let this = self.clone();
391            self.executor
392                .spawn(
393                    async move {
394                        let mut json_bytes = Vec::new();
395
396                        if let Some(file) = &mut this.state.lock().log_file {
397                            let file = file.as_file_mut();
398                            for event in &mut events {
399                                json_bytes.clear();
400                                serde_json::to_writer(&mut json_bytes, event)?;
401                                file.write_all(&json_bytes)?;
402                                file.write(b"\n")?;
403
404                                event.properties.token = token;
405                            }
406                        }
407
408                        json_bytes.clear();
409                        serde_json::to_writer(&mut json_bytes, &events)?;
410                        this.http_client
411                            .post_json(MIXPANEL_EVENTS_URL, json_bytes.into())
412                            .await?;
413                        anyhow::Ok(())
414                    }
415                    .log_err(),
416                )
417                .detach();
418        }
419    }
420
421    fn flush_clickhouse_events(self: &Arc<Self>) {
422        let mut state = self.state.lock();
423        let mut events = mem::take(&mut state.clickhouse_events_queue);
424        state.flush_clickhouse_events_task.take();
425        drop(state);
426
427        let this = self.clone();
428        self.executor
429            .spawn(
430                async move {
431                    let mut json_bytes = Vec::new();
432
433                    if let Some(file) = &mut this.state.lock().log_file {
434                        let file = file.as_file_mut();
435                        for event in &mut events {
436                            json_bytes.clear();
437                            serde_json::to_writer(&mut json_bytes, event)?;
438                            file.write_all(&json_bytes)?;
439                            file.write(b"\n")?;
440                        }
441                    }
442
443                    {
444                        let state = this.state.lock();
445                        json_bytes.clear();
446                        serde_json::to_writer(
447                            &mut json_bytes,
448                            &ClickhouseEventRequestBody {
449                                token: ZED_SECRET_CLIENT_TOKEN,
450                                installation_id: state.installation_id.clone(),
451                                app_version: state.app_version.clone(),
452                                os_name: state.os_name,
453                                os_version: state.os_version.clone(),
454                                release_channel: state.release_channel,
455                                events,
456                            },
457                        )?;
458                    }
459
460                    this.http_client
461                        .post_json(CLICKHOUSE_EVENTS_URL.as_str(), json_bytes.into())
462                        .await?;
463                    anyhow::Ok(())
464                }
465                .log_err(),
466            )
467            .detach();
468    }
469}