telemetry.rs

  1use crate::{ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL};
  2use db::kvp::KEY_VALUE_STORE;
  3use gpui::{
  4    executor::Background,
  5    serde_json::{self, value::Map, Value},
  6    AppContext, Task,
  7};
  8use lazy_static::lazy_static;
  9use parking_lot::Mutex;
 10use serde::Serialize;
 11use serde_json::json;
 12use settings::TelemetrySettings;
 13use std::{
 14    io::Write,
 15    mem,
 16    path::PathBuf,
 17    sync::Arc,
 18    time::{Duration, SystemTime, UNIX_EPOCH},
 19};
 20use tempfile::NamedTempFile;
 21use util::http::HttpClient;
 22use util::{channel::ReleaseChannel, post_inc, ResultExt, TryFutureExt};
 23use uuid::Uuid;
 24
 25pub struct Telemetry {
 26    http_client: Arc<dyn HttpClient>,
 27    executor: Arc<Background>,
 28    state: Mutex<TelemetryState>,
 29}
 30
 31#[derive(Default)]
 32struct TelemetryState {
 33    metrics_id: Option<Arc<str>>,
 34    device_id: Option<Arc<str>>,
 35    app_version: Option<Arc<str>>,
 36    release_channel: Option<&'static str>,
 37    os_version: Option<Arc<str>>,
 38    os_name: &'static str,
 39    mixpanel_events_queue: Vec<MixpanelEvent>, // Mixpanel mixed events - will hopefully die soon
 40    clickhouse_events_queue: Vec<ClickhouseEventWrapper>,
 41    next_mixpanel_event_id: usize,
 42    flush_mixpanel_events_task: Option<Task<()>>,
 43    flush_clickhouse_events_task: Option<Task<()>>,
 44    log_file: Option<NamedTempFile>,
 45    is_staff: Option<bool>,
 46}
 47
 48const MIXPANEL_EVENTS_URL: &'static str = "https://api.mixpanel.com/track";
 49const MIXPANEL_ENGAGE_URL: &'static str = "https://api.mixpanel.com/engage#profile-set";
 50const CLICKHOUSE_EVENTS_URL_PATH: &'static str = "/api/events";
 51
 52lazy_static! {
 53    static ref MIXPANEL_TOKEN: Option<String> = std::env::var("ZED_MIXPANEL_TOKEN")
 54        .ok()
 55        .or_else(|| option_env!("ZED_MIXPANEL_TOKEN").map(|key| key.to_string()));
 56    static ref CLICKHOUSE_EVENTS_URL: String =
 57        format!("{}{}", *ZED_SERVER_URL, CLICKHOUSE_EVENTS_URL_PATH);
 58}
 59
 60#[derive(Serialize, Debug)]
 61struct ClickhouseEventRequestBody {
 62    token: &'static str,
 63    installation_id: Option<Arc<str>>,
 64    app_version: Option<Arc<str>>,
 65    os_name: &'static str,
 66    os_version: Option<Arc<str>>,
 67    release_channel: Option<&'static str>,
 68    events: Vec<ClickhouseEventWrapper>,
 69}
 70
 71#[derive(Serialize, Debug)]
 72struct ClickhouseEventWrapper {
 73    time: u128,
 74    signed_in: bool,
 75    #[serde(flatten)]
 76    event: ClickhouseEvent,
 77}
 78
 79#[derive(Serialize, Debug)]
 80#[serde(tag = "type")]
 81pub enum ClickhouseEvent {
 82    Editor {
 83        operation: &'static str,
 84        file_extension: Option<String>,
 85        vim_mode: bool,
 86        copilot_enabled: bool,
 87        copilot_enabled_for_language: bool,
 88    },
 89}
 90
 91#[derive(Serialize, Debug)]
 92struct MixpanelEvent {
 93    event: String,
 94    properties: MixpanelEventProperties,
 95}
 96
 97#[derive(Serialize, Debug)]
 98struct MixpanelEventProperties {
 99    // Mixpanel required fields
100    #[serde(skip_serializing_if = "str::is_empty")]
101    token: &'static str,
102    time: u128,
103    distinct_id: Option<Arc<str>>,
104    #[serde(rename = "$insert_id")]
105    insert_id: usize,
106    // Custom fields
107    #[serde(skip_serializing_if = "Option::is_none", flatten)]
108    event_properties: Option<Map<String, Value>>,
109    #[serde(rename = "OS Name")]
110    os_name: &'static str,
111    #[serde(rename = "OS Version")]
112    os_version: Option<Arc<str>>,
113    #[serde(rename = "Release Channel")]
114    release_channel: Option<&'static str>,
115    #[serde(rename = "App Version")]
116    app_version: Option<Arc<str>>,
117    #[serde(rename = "Signed In")]
118    signed_in: bool,
119}
120
121#[derive(Serialize)]
122struct MixpanelEngageRequest {
123    #[serde(rename = "$token")]
124    token: &'static str,
125    #[serde(rename = "$distinct_id")]
126    distinct_id: Arc<str>,
127    #[serde(rename = "$set")]
128    set: Value,
129}
130
131#[cfg(debug_assertions)]
132const MAX_QUEUE_LEN: usize = 1;
133
134#[cfg(not(debug_assertions))]
135const MAX_QUEUE_LEN: usize = 10;
136
137#[cfg(debug_assertions)]
138const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
139
140#[cfg(not(debug_assertions))]
141const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
142
143impl Telemetry {
144    pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
145        let platform = cx.platform();
146        let release_channel = if cx.has_global::<ReleaseChannel>() {
147            Some(cx.global::<ReleaseChannel>().display_name())
148        } else {
149            None
150        };
151        let this = Arc::new(Self {
152            http_client: client,
153            executor: cx.background().clone(),
154            state: Mutex::new(TelemetryState {
155                os_version: platform.os_version().ok().map(|v| v.to_string().into()),
156                os_name: platform.os_name().into(),
157                app_version: platform.app_version().ok().map(|v| v.to_string().into()),
158                release_channel,
159                device_id: None,
160                metrics_id: None,
161                mixpanel_events_queue: Default::default(),
162                clickhouse_events_queue: Default::default(),
163                flush_mixpanel_events_task: Default::default(),
164                flush_clickhouse_events_task: Default::default(),
165                next_mixpanel_event_id: 0,
166                log_file: None,
167                is_staff: None,
168            }),
169        });
170
171        if MIXPANEL_TOKEN.is_some() {
172            this.executor
173                .spawn({
174                    let this = this.clone();
175                    async move {
176                        if let Some(tempfile) = NamedTempFile::new().log_err() {
177                            this.state.lock().log_file = Some(tempfile);
178                        }
179                    }
180                })
181                .detach();
182        }
183
184        this
185    }
186
187    pub fn log_file_path(&self) -> Option<PathBuf> {
188        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
189    }
190
191    pub fn start(self: &Arc<Self>) {
192        let this = self.clone();
193        self.executor
194            .spawn(
195                async move {
196                    let device_id =
197                        if let Ok(Some(device_id)) = KEY_VALUE_STORE.read_kvp("device_id") {
198                            device_id
199                        } else {
200                            let device_id = Uuid::new_v4().to_string();
201                            KEY_VALUE_STORE
202                                .write_kvp("device_id".to_string(), device_id.clone())
203                                .await?;
204                            device_id
205                        };
206
207                    let device_id: Arc<str> = device_id.into();
208                    let mut state = this.state.lock();
209                    state.device_id = Some(device_id.clone());
210
211                    for event in &mut state.mixpanel_events_queue {
212                        event
213                            .properties
214                            .distinct_id
215                            .get_or_insert_with(|| device_id.clone());
216                    }
217
218                    let has_mixpanel_events = !state.mixpanel_events_queue.is_empty();
219                    let has_clickhouse_events = !state.clickhouse_events_queue.is_empty();
220                    drop(state);
221
222                    if has_mixpanel_events {
223                        this.flush_mixpanel_events();
224                    }
225
226                    if has_clickhouse_events {
227                        this.flush_clickhouse_events();
228                    }
229
230                    anyhow::Ok(())
231                }
232                .log_err(),
233            )
234            .detach();
235    }
236
237    /// This method takes the entire TelemetrySettings struct in order to force client code
238    /// to pull the struct out of the settings global. Do not remove!
239    pub fn set_authenticated_user_info(
240        self: &Arc<Self>,
241        metrics_id: Option<String>,
242        is_staff: bool,
243        telemetry_settings: TelemetrySettings,
244    ) {
245        if !telemetry_settings.metrics() {
246            return;
247        }
248
249        let this = self.clone();
250        let mut state = self.state.lock();
251        let device_id = state.device_id.clone();
252        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
253        state.metrics_id = metrics_id.clone();
254        state.is_staff = Some(is_staff);
255        drop(state);
256
257        if let Some((token, device_id)) = MIXPANEL_TOKEN.as_ref().zip(device_id) {
258            self.executor
259                .spawn(
260                    async move {
261                        let json_bytes = serde_json::to_vec(&[MixpanelEngageRequest {
262                            token,
263                            distinct_id: device_id,
264                            set: json!({
265                                "Staff": is_staff,
266                                "ID": metrics_id,
267                                "App": true
268                            }),
269                        }])?;
270
271                        this.http_client
272                            .post_json(MIXPANEL_ENGAGE_URL, json_bytes.into())
273                            .await?;
274                        anyhow::Ok(())
275                    }
276                    .log_err(),
277                )
278                .detach();
279        }
280    }
281
282    pub fn report_clickhouse_event(
283        self: &Arc<Self>,
284        event: ClickhouseEvent,
285        telemetry_settings: TelemetrySettings,
286    ) {
287        if !telemetry_settings.metrics() {
288            return;
289        }
290
291        let mut state = self.state.lock();
292        let signed_in = state.metrics_id.is_some();
293        state.clickhouse_events_queue.push(ClickhouseEventWrapper {
294            time: SystemTime::now()
295                .duration_since(UNIX_EPOCH)
296                .unwrap()
297                .as_millis(),
298            signed_in,
299            event,
300        });
301
302        if state.device_id.is_some() {
303            if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
304                drop(state);
305                self.flush_clickhouse_events();
306            } else {
307                let this = self.clone();
308                let executor = self.executor.clone();
309                state.flush_clickhouse_events_task = Some(self.executor.spawn(async move {
310                    executor.timer(DEBOUNCE_INTERVAL).await;
311                    this.flush_clickhouse_events();
312                }));
313            }
314        }
315    }
316
317    pub fn report_mixpanel_event(
318        self: &Arc<Self>,
319        kind: &str,
320        properties: Value,
321        telemetry_settings: TelemetrySettings,
322    ) {
323        if !telemetry_settings.metrics() {
324            return;
325        }
326
327        let mut state = self.state.lock();
328        let event = MixpanelEvent {
329            event: kind.into(),
330            properties: MixpanelEventProperties {
331                token: "",
332                time: SystemTime::now()
333                    .duration_since(UNIX_EPOCH)
334                    .unwrap()
335                    .as_millis(),
336                distinct_id: state.device_id.clone(),
337                insert_id: post_inc(&mut state.next_mixpanel_event_id),
338                event_properties: if let Value::Object(properties) = properties {
339                    Some(properties)
340                } else {
341                    None
342                },
343                os_name: state.os_name,
344                os_version: state.os_version.clone(),
345                release_channel: state.release_channel,
346                app_version: state.app_version.clone(),
347                signed_in: state.metrics_id.is_some(),
348            },
349        };
350        state.mixpanel_events_queue.push(event);
351        if state.device_id.is_some() {
352            if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
353                drop(state);
354                self.flush_mixpanel_events();
355            } else {
356                let this = self.clone();
357                let executor = self.executor.clone();
358                state.flush_mixpanel_events_task = Some(self.executor.spawn(async move {
359                    executor.timer(DEBOUNCE_INTERVAL).await;
360                    this.flush_mixpanel_events();
361                }));
362            }
363        }
364    }
365
366    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
367        self.state.lock().metrics_id.clone()
368    }
369
370    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
371        self.state.lock().is_staff
372    }
373
374    fn flush_mixpanel_events(self: &Arc<Self>) {
375        let mut state = self.state.lock();
376        let mut events = mem::take(&mut state.mixpanel_events_queue);
377        state.flush_mixpanel_events_task.take();
378        drop(state);
379
380        if let Some(token) = MIXPANEL_TOKEN.as_ref() {
381            let this = self.clone();
382            self.executor
383                .spawn(
384                    async move {
385                        let mut json_bytes = Vec::new();
386
387                        if let Some(file) = &mut this.state.lock().log_file {
388                            let file = file.as_file_mut();
389                            for event in &mut events {
390                                json_bytes.clear();
391                                serde_json::to_writer(&mut json_bytes, event)?;
392                                file.write_all(&json_bytes)?;
393                                file.write(b"\n")?;
394
395                                event.properties.token = token;
396                            }
397                        }
398
399                        json_bytes.clear();
400                        serde_json::to_writer(&mut json_bytes, &events)?;
401                        this.http_client
402                            .post_json(MIXPANEL_EVENTS_URL, json_bytes.into())
403                            .await?;
404                        anyhow::Ok(())
405                    }
406                    .log_err(),
407                )
408                .detach();
409        }
410    }
411
412    fn flush_clickhouse_events(self: &Arc<Self>) {
413        let mut state = self.state.lock();
414        let mut events = mem::take(&mut state.clickhouse_events_queue);
415        state.flush_clickhouse_events_task.take();
416        drop(state);
417
418        let this = self.clone();
419        self.executor
420            .spawn(
421                async move {
422                    let mut json_bytes = Vec::new();
423
424                    if let Some(file) = &mut this.state.lock().log_file {
425                        let file = file.as_file_mut();
426                        for event in &mut events {
427                            json_bytes.clear();
428                            serde_json::to_writer(&mut json_bytes, event)?;
429                            file.write_all(&json_bytes)?;
430                            file.write(b"\n")?;
431                        }
432                    }
433
434                    {
435                        let state = this.state.lock();
436                        json_bytes.clear();
437                        serde_json::to_writer(
438                            &mut json_bytes,
439                            &ClickhouseEventRequestBody {
440                                token: ZED_SECRET_CLIENT_TOKEN,
441                                installation_id: state.device_id.clone(),
442                                app_version: state.app_version.clone(),
443                                os_name: state.os_name,
444                                os_version: state.os_version.clone(),
445                                release_channel: state.release_channel,
446                                events,
447                            },
448                        )?;
449                    }
450
451                    this.http_client
452                        .post_json(CLICKHOUSE_EVENTS_URL.as_str(), json_bytes.into())
453                        .await?;
454                    anyhow::Ok(())
455                }
456                .log_err(),
457            )
458            .detach();
459    }
460}