telemetry.rs

  1use crate::{ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL};
  2use db::kvp::KEY_VALUE_STORE;
  3use gpui::{
  4    executor::Background,
  5    serde_json::{self, value::Map, Value},
  6    AppContext, Task,
  7};
  8use lazy_static::lazy_static;
  9use parking_lot::Mutex;
 10use serde::Serialize;
 11use serde_json::json;
 12use settings::TelemetrySettings;
 13use std::{
 14    io::Write,
 15    mem,
 16    path::PathBuf,
 17    sync::Arc,
 18    time::{Duration, SystemTime, UNIX_EPOCH},
 19};
 20use tempfile::NamedTempFile;
 21use util::http::HttpClient;
 22use util::{channel::ReleaseChannel, post_inc, ResultExt, TryFutureExt};
 23use uuid::Uuid;
 24
 25pub struct Telemetry {
 26    http_client: Arc<dyn HttpClient>,
 27    executor: Arc<Background>,
 28    state: Mutex<TelemetryState>,
 29}
 30
 31#[derive(Default)]
 32struct TelemetryState {
 33    metrics_id: Option<Arc<str>>,      // Per logged-in user
 34    installation_id: Option<Arc<str>>, // Per app installation
 35    app_version: Option<Arc<str>>,
 36    release_channel: Option<&'static str>,
 37    os_version: Option<Arc<str>>,
 38    os_name: &'static str,
 39    mixpanel_events_queue: Vec<MixpanelEvent>,
 40    clickhouse_events_queue: Vec<ClickhouseEventWrapper>,
 41    next_mixpanel_event_id: usize,
 42    flush_mixpanel_events_task: Option<Task<()>>,
 43    flush_clickhouse_events_task: Option<Task<()>>,
 44    log_file: Option<NamedTempFile>,
 45    is_staff: Option<bool>,
 46}
 47
 48const MIXPANEL_EVENTS_URL: &'static str = "https://api.mixpanel.com/track";
 49const MIXPANEL_ENGAGE_URL: &'static str = "https://api.mixpanel.com/engage#profile-set";
 50const CLICKHOUSE_EVENTS_URL_PATH: &'static str = "/api/events";
 51
 52lazy_static! {
 53    static ref MIXPANEL_TOKEN: Option<String> = std::env::var("ZED_MIXPANEL_TOKEN")
 54        .ok()
 55        .or_else(|| option_env!("ZED_MIXPANEL_TOKEN").map(|key| key.to_string()));
 56    static ref CLICKHOUSE_EVENTS_URL: String =
 57        format!("{}{}", *ZED_SERVER_URL, CLICKHOUSE_EVENTS_URL_PATH);
 58}
 59
 60#[derive(Serialize, Debug)]
 61struct ClickhouseEventRequestBody {
 62    token: &'static str,
 63    installation_id: Option<Arc<str>>,
 64    app_version: Option<Arc<str>>,
 65    os_name: &'static str,
 66    os_version: Option<Arc<str>>,
 67    release_channel: Option<&'static str>,
 68    events: Vec<ClickhouseEventWrapper>,
 69}
 70
 71#[derive(Serialize, Debug)]
 72struct ClickhouseEventWrapper {
 73    time: u128,
 74    signed_in: bool,
 75    #[serde(flatten)]
 76    event: ClickhouseEvent,
 77}
 78
 79#[derive(Serialize, Debug)]
 80#[serde(tag = "type")]
 81pub enum ClickhouseEvent {
 82    Editor {
 83        operation: &'static str,
 84        file_extension: Option<String>,
 85        vim_mode: bool,
 86        copilot_enabled: bool,
 87        copilot_enabled_for_language: bool,
 88    },
 89    Copilot {
 90        suggestion_id: Option<String>,
 91        suggestion_accepted: bool,
 92        file_extension: Option<String>,
 93    },
 94}
 95
 96#[derive(Serialize, Debug)]
 97struct MixpanelEvent {
 98    event: String,
 99    properties: MixpanelEventProperties,
100}
101
102#[derive(Serialize, Debug)]
103struct MixpanelEventProperties {
104    // Mixpanel required fields
105    #[serde(skip_serializing_if = "str::is_empty")]
106    token: &'static str,
107    time: u128,
108    #[serde(rename = "distinct_id")]
109    installation_id: Option<Arc<str>>,
110    #[serde(rename = "$insert_id")]
111    insert_id: usize,
112    // Custom fields
113    #[serde(skip_serializing_if = "Option::is_none", flatten)]
114    event_properties: Option<Map<String, Value>>,
115    #[serde(rename = "OS Name")]
116    os_name: &'static str,
117    #[serde(rename = "OS Version")]
118    os_version: Option<Arc<str>>,
119    #[serde(rename = "Release Channel")]
120    release_channel: Option<&'static str>,
121    #[serde(rename = "App Version")]
122    app_version: Option<Arc<str>>,
123    #[serde(rename = "Signed In")]
124    signed_in: bool,
125}
126
127#[derive(Serialize)]
128struct MixpanelEngageRequest {
129    #[serde(rename = "$token")]
130    token: &'static str,
131    #[serde(rename = "$distinct_id")]
132    installation_id: Arc<str>,
133    #[serde(rename = "$set")]
134    set: Value,
135}
136
137#[cfg(debug_assertions)]
138const MAX_QUEUE_LEN: usize = 1;
139
140#[cfg(not(debug_assertions))]
141const MAX_QUEUE_LEN: usize = 10;
142
143#[cfg(debug_assertions)]
144const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
145
146#[cfg(not(debug_assertions))]
147const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
148
149impl Telemetry {
150    pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
151        let platform = cx.platform();
152        let release_channel = if cx.has_global::<ReleaseChannel>() {
153            Some(cx.global::<ReleaseChannel>().display_name())
154        } else {
155            None
156        };
157        let this = Arc::new(Self {
158            http_client: client,
159            executor: cx.background().clone(),
160            state: Mutex::new(TelemetryState {
161                os_version: platform.os_version().ok().map(|v| v.to_string().into()),
162                os_name: platform.os_name().into(),
163                app_version: platform.app_version().ok().map(|v| v.to_string().into()),
164                release_channel,
165                installation_id: None,
166                metrics_id: None,
167                mixpanel_events_queue: Default::default(),
168                clickhouse_events_queue: Default::default(),
169                flush_mixpanel_events_task: Default::default(),
170                flush_clickhouse_events_task: Default::default(),
171                next_mixpanel_event_id: 0,
172                log_file: None,
173                is_staff: None,
174            }),
175        });
176
177        if MIXPANEL_TOKEN.is_some() {
178            this.executor
179                .spawn({
180                    let this = this.clone();
181                    async move {
182                        if let Some(tempfile) = NamedTempFile::new().log_err() {
183                            this.state.lock().log_file = Some(tempfile);
184                        }
185                    }
186                })
187                .detach();
188        }
189
190        this
191    }
192
193    pub fn log_file_path(&self) -> Option<PathBuf> {
194        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
195    }
196
197    pub fn start(self: &Arc<Self>) {
198        let this = self.clone();
199        self.executor
200            .spawn(
201                async move {
202                    let installation_id =
203                        if let Ok(Some(installation_id)) = KEY_VALUE_STORE.read_kvp("device_id") {
204                            installation_id
205                        } else {
206                            let installation_id = Uuid::new_v4().to_string();
207                            KEY_VALUE_STORE
208                                .write_kvp("device_id".to_string(), installation_id.clone())
209                                .await?;
210                            installation_id
211                        };
212
213                    let installation_id: Arc<str> = installation_id.into();
214                    let mut state = this.state.lock();
215                    state.installation_id = Some(installation_id.clone());
216
217                    for event in &mut state.mixpanel_events_queue {
218                        event
219                            .properties
220                            .installation_id
221                            .get_or_insert_with(|| installation_id.clone());
222                    }
223
224                    let has_mixpanel_events = !state.mixpanel_events_queue.is_empty();
225                    let has_clickhouse_events = !state.clickhouse_events_queue.is_empty();
226                    drop(state);
227
228                    if has_mixpanel_events {
229                        this.flush_mixpanel_events();
230                    }
231
232                    if has_clickhouse_events {
233                        this.flush_clickhouse_events();
234                    }
235
236                    anyhow::Ok(())
237                }
238                .log_err(),
239            )
240            .detach();
241    }
242
243    /// This method takes the entire TelemetrySettings struct in order to force client code
244    /// to pull the struct out of the settings global. Do not remove!
245    pub fn set_authenticated_user_info(
246        self: &Arc<Self>,
247        metrics_id: Option<String>,
248        is_staff: bool,
249        telemetry_settings: TelemetrySettings,
250    ) {
251        if !telemetry_settings.metrics() {
252            return;
253        }
254
255        let this = self.clone();
256        let mut state = self.state.lock();
257        let installation_id = state.installation_id.clone();
258        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
259        state.metrics_id = metrics_id.clone();
260        state.is_staff = Some(is_staff);
261        drop(state);
262
263        if let Some((token, installation_id)) = MIXPANEL_TOKEN.as_ref().zip(installation_id) {
264            self.executor
265                .spawn(
266                    async move {
267                        let json_bytes = serde_json::to_vec(&[MixpanelEngageRequest {
268                            token,
269                            installation_id,
270                            set: json!({
271                                "Staff": is_staff,
272                                "ID": metrics_id,
273                                "App": true
274                            }),
275                        }])?;
276
277                        this.http_client
278                            .post_json(MIXPANEL_ENGAGE_URL, json_bytes.into())
279                            .await?;
280                        anyhow::Ok(())
281                    }
282                    .log_err(),
283                )
284                .detach();
285        }
286    }
287
288    pub fn report_clickhouse_event(
289        self: &Arc<Self>,
290        event: ClickhouseEvent,
291        telemetry_settings: TelemetrySettings,
292    ) {
293        if !telemetry_settings.metrics() {
294            return;
295        }
296
297        let mut state = self.state.lock();
298        let signed_in = state.metrics_id.is_some();
299        state.clickhouse_events_queue.push(ClickhouseEventWrapper {
300            time: SystemTime::now()
301                .duration_since(UNIX_EPOCH)
302                .unwrap()
303                .as_millis(),
304            signed_in,
305            event,
306        });
307
308        if state.installation_id.is_some() {
309            if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
310                drop(state);
311                self.flush_clickhouse_events();
312            } else {
313                let this = self.clone();
314                let executor = self.executor.clone();
315                state.flush_clickhouse_events_task = Some(self.executor.spawn(async move {
316                    executor.timer(DEBOUNCE_INTERVAL).await;
317                    this.flush_clickhouse_events();
318                }));
319            }
320        }
321    }
322
323    pub fn report_mixpanel_event(
324        self: &Arc<Self>,
325        kind: &str,
326        properties: Value,
327        telemetry_settings: TelemetrySettings,
328    ) {
329        if !telemetry_settings.metrics() {
330            return;
331        }
332
333        let mut state = self.state.lock();
334        let event = MixpanelEvent {
335            event: kind.into(),
336            properties: MixpanelEventProperties {
337                token: "",
338                time: SystemTime::now()
339                    .duration_since(UNIX_EPOCH)
340                    .unwrap()
341                    .as_millis(),
342                installation_id: state.installation_id.clone(),
343                insert_id: post_inc(&mut state.next_mixpanel_event_id),
344                event_properties: if let Value::Object(properties) = properties {
345                    Some(properties)
346                } else {
347                    None
348                },
349                os_name: state.os_name,
350                os_version: state.os_version.clone(),
351                release_channel: state.release_channel,
352                app_version: state.app_version.clone(),
353                signed_in: state.metrics_id.is_some(),
354            },
355        };
356        state.mixpanel_events_queue.push(event);
357        if state.installation_id.is_some() {
358            if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
359                drop(state);
360                self.flush_mixpanel_events();
361            } else {
362                let this = self.clone();
363                let executor = self.executor.clone();
364                state.flush_mixpanel_events_task = Some(self.executor.spawn(async move {
365                    executor.timer(DEBOUNCE_INTERVAL).await;
366                    this.flush_mixpanel_events();
367                }));
368            }
369        }
370    }
371
372    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
373        self.state.lock().metrics_id.clone()
374    }
375
376    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
377        self.state.lock().installation_id.clone()
378    }
379
380    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
381        self.state.lock().is_staff
382    }
383
384    fn flush_mixpanel_events(self: &Arc<Self>) {
385        let mut state = self.state.lock();
386        let mut events = mem::take(&mut state.mixpanel_events_queue);
387        state.flush_mixpanel_events_task.take();
388        drop(state);
389
390        if let Some(token) = MIXPANEL_TOKEN.as_ref() {
391            let this = self.clone();
392            self.executor
393                .spawn(
394                    async move {
395                        let mut json_bytes = Vec::new();
396
397                        if let Some(file) = &mut this.state.lock().log_file {
398                            let file = file.as_file_mut();
399                            for event in &mut events {
400                                json_bytes.clear();
401                                serde_json::to_writer(&mut json_bytes, event)?;
402                                file.write_all(&json_bytes)?;
403                                file.write(b"\n")?;
404
405                                event.properties.token = token;
406                            }
407                        }
408
409                        json_bytes.clear();
410                        serde_json::to_writer(&mut json_bytes, &events)?;
411                        this.http_client
412                            .post_json(MIXPANEL_EVENTS_URL, json_bytes.into())
413                            .await?;
414                        anyhow::Ok(())
415                    }
416                    .log_err(),
417                )
418                .detach();
419        }
420    }
421
422    fn flush_clickhouse_events(self: &Arc<Self>) {
423        let mut state = self.state.lock();
424        let mut events = mem::take(&mut state.clickhouse_events_queue);
425        state.flush_clickhouse_events_task.take();
426        drop(state);
427
428        let this = self.clone();
429        self.executor
430            .spawn(
431                async move {
432                    let mut json_bytes = Vec::new();
433
434                    if let Some(file) = &mut this.state.lock().log_file {
435                        let file = file.as_file_mut();
436                        for event in &mut events {
437                            json_bytes.clear();
438                            serde_json::to_writer(&mut json_bytes, event)?;
439                            file.write_all(&json_bytes)?;
440                            file.write(b"\n")?;
441                        }
442                    }
443
444                    {
445                        let state = this.state.lock();
446                        json_bytes.clear();
447                        serde_json::to_writer(
448                            &mut json_bytes,
449                            &ClickhouseEventRequestBody {
450                                token: ZED_SECRET_CLIENT_TOKEN,
451                                installation_id: state.installation_id.clone(),
452                                app_version: state.app_version.clone(),
453                                os_name: state.os_name,
454                                os_version: state.os_version.clone(),
455                                release_channel: state.release_channel,
456                                events,
457                            },
458                        )?;
459                    }
460
461                    this.http_client
462                        .post_json(CLICKHOUSE_EVENTS_URL.as_str(), json_bytes.into())
463                        .await?;
464                    anyhow::Ok(())
465                }
466                .log_err(),
467            )
468            .detach();
469    }
470}