telemetry.rs

  1use crate::{ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL};
  2use db::kvp::KEY_VALUE_STORE;
  3use gpui::{
  4    executor::Background,
  5    serde_json::{self, value::Map, Value},
  6    AppContext, Task,
  7};
  8use lazy_static::lazy_static;
  9use parking_lot::Mutex;
 10use serde::Serialize;
 11use serde_json::json;
 12use settings::TelemetrySettings;
 13use std::{
 14    io::Write,
 15    mem,
 16    path::PathBuf,
 17    sync::Arc,
 18    time::{Duration, SystemTime, UNIX_EPOCH},
 19};
 20use tempfile::NamedTempFile;
 21use util::http::HttpClient;
 22use util::{channel::ReleaseChannel, post_inc, ResultExt, TryFutureExt};
 23use uuid::Uuid;
 24
 25pub struct Telemetry {
 26    http_client: Arc<dyn HttpClient>,
 27    executor: Arc<Background>,
 28    state: Mutex<TelemetryState>,
 29}
 30
 31#[derive(Default)]
 32struct TelemetryState {
 33    metrics_id: Option<Arc<str>>,      // Per logged-in user
 34    installation_id: Option<Arc<str>>, // Per app installation
 35    app_version: Option<Arc<str>>,
 36    release_channel: Option<&'static str>,
 37    os_version: Option<Arc<str>>,
 38    os_name: &'static str,
 39    mixpanel_events_queue: Vec<MixpanelEvent>,
 40    clickhouse_events_queue: Vec<ClickhouseEventWrapper>,
 41    next_mixpanel_event_id: usize,
 42    flush_mixpanel_events_task: Option<Task<()>>,
 43    flush_clickhouse_events_task: Option<Task<()>>,
 44    log_file: Option<NamedTempFile>,
 45    is_staff: Option<bool>,
 46}
 47
 48const MIXPANEL_EVENTS_URL: &'static str = "https://api.mixpanel.com/track";
 49const MIXPANEL_ENGAGE_URL: &'static str = "https://api.mixpanel.com/engage#profile-set";
 50const CLICKHOUSE_EVENTS_URL_PATH: &'static str = "/api/events";
 51
 52lazy_static! {
 53    static ref MIXPANEL_TOKEN: Option<String> = std::env::var("ZED_MIXPANEL_TOKEN")
 54        .ok()
 55        .or_else(|| option_env!("ZED_MIXPANEL_TOKEN").map(|key| key.to_string()));
 56    static ref CLICKHOUSE_EVENTS_URL: String =
 57        format!("{}{}", *ZED_SERVER_URL, CLICKHOUSE_EVENTS_URL_PATH);
 58}
 59
 60#[derive(Serialize, Debug)]
 61struct ClickhouseEventRequestBody {
 62    token: &'static str,
 63    installation_id: Option<Arc<str>>,
 64    app_version: Option<Arc<str>>,
 65    os_name: &'static str,
 66    os_version: Option<Arc<str>>,
 67    release_channel: Option<&'static str>,
 68    events: Vec<ClickhouseEventWrapper>,
 69}
 70
 71#[derive(Serialize, Debug)]
 72struct ClickhouseEventWrapper {
 73    time: u128,
 74    signed_in: bool,
 75    #[serde(flatten)]
 76    event: ClickhouseEvent,
 77}
 78
 79#[derive(Serialize, Debug)]
 80#[serde(tag = "type")]
 81pub enum ClickhouseEvent {
 82    Editor {
 83        operation: &'static str,
 84        file_extension: Option<String>,
 85        vim_mode: bool,
 86        copilot_enabled: bool,
 87        copilot_enabled_for_language: bool,
 88    },
 89}
 90
 91#[derive(Serialize, Debug)]
 92struct MixpanelEvent {
 93    event: String,
 94    properties: MixpanelEventProperties,
 95}
 96
 97#[derive(Serialize, Debug)]
 98struct MixpanelEventProperties {
 99    // Mixpanel required fields
100    #[serde(skip_serializing_if = "str::is_empty")]
101    token: &'static str,
102    time: u128,
103    #[serde(rename = "distinct_id")]
104    installation_id: Option<Arc<str>>,
105    #[serde(rename = "$insert_id")]
106    insert_id: usize,
107    // Custom fields
108    #[serde(skip_serializing_if = "Option::is_none", flatten)]
109    event_properties: Option<Map<String, Value>>,
110    #[serde(rename = "OS Name")]
111    os_name: &'static str,
112    #[serde(rename = "OS Version")]
113    os_version: Option<Arc<str>>,
114    #[serde(rename = "Release Channel")]
115    release_channel: Option<&'static str>,
116    #[serde(rename = "App Version")]
117    app_version: Option<Arc<str>>,
118    #[serde(rename = "Signed In")]
119    signed_in: bool,
120}
121
122#[derive(Serialize)]
123struct MixpanelEngageRequest {
124    #[serde(rename = "$token")]
125    token: &'static str,
126    #[serde(rename = "$distinct_id")]
127    installation_id: Arc<str>,
128    #[serde(rename = "$set")]
129    set: Value,
130}
131
132#[cfg(debug_assertions)]
133const MAX_QUEUE_LEN: usize = 1;
134
135#[cfg(not(debug_assertions))]
136const MAX_QUEUE_LEN: usize = 10;
137
138#[cfg(debug_assertions)]
139const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
140
141#[cfg(not(debug_assertions))]
142const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
143
144impl Telemetry {
145    pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
146        let platform = cx.platform();
147        let release_channel = if cx.has_global::<ReleaseChannel>() {
148            Some(cx.global::<ReleaseChannel>().display_name())
149        } else {
150            None
151        };
152        let this = Arc::new(Self {
153            http_client: client,
154            executor: cx.background().clone(),
155            state: Mutex::new(TelemetryState {
156                os_version: platform.os_version().ok().map(|v| v.to_string().into()),
157                os_name: platform.os_name().into(),
158                app_version: platform.app_version().ok().map(|v| v.to_string().into()),
159                release_channel,
160                installation_id: None,
161                metrics_id: None,
162                mixpanel_events_queue: Default::default(),
163                clickhouse_events_queue: Default::default(),
164                flush_mixpanel_events_task: Default::default(),
165                flush_clickhouse_events_task: Default::default(),
166                next_mixpanel_event_id: 0,
167                log_file: None,
168                is_staff: None,
169            }),
170        });
171
172        if MIXPANEL_TOKEN.is_some() {
173            this.executor
174                .spawn({
175                    let this = this.clone();
176                    async move {
177                        if let Some(tempfile) = NamedTempFile::new().log_err() {
178                            this.state.lock().log_file = Some(tempfile);
179                        }
180                    }
181                })
182                .detach();
183        }
184
185        this
186    }
187
188    pub fn log_file_path(&self) -> Option<PathBuf> {
189        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
190    }
191
192    pub fn start(self: &Arc<Self>) {
193        let this = self.clone();
194        self.executor
195            .spawn(
196                async move {
197                    let installation_id =
198                        if let Ok(Some(installation_id)) = KEY_VALUE_STORE.read_kvp("device_id") {
199                            installation_id
200                        } else {
201                            let installation_id = Uuid::new_v4().to_string();
202                            KEY_VALUE_STORE
203                                .write_kvp("device_id".to_string(), installation_id.clone())
204                                .await?;
205                            installation_id
206                        };
207
208                    let installation_id: Arc<str> = installation_id.into();
209                    let mut state = this.state.lock();
210                    state.installation_id = Some(installation_id.clone());
211
212                    for event in &mut state.mixpanel_events_queue {
213                        event
214                            .properties
215                            .installation_id
216                            .get_or_insert_with(|| installation_id.clone());
217                    }
218
219                    let has_mixpanel_events = !state.mixpanel_events_queue.is_empty();
220                    let has_clickhouse_events = !state.clickhouse_events_queue.is_empty();
221                    drop(state);
222
223                    if has_mixpanel_events {
224                        this.flush_mixpanel_events();
225                    }
226
227                    if has_clickhouse_events {
228                        this.flush_clickhouse_events();
229                    }
230
231                    anyhow::Ok(())
232                }
233                .log_err(),
234            )
235            .detach();
236    }
237
238    /// This method takes the entire TelemetrySettings struct in order to force client code
239    /// to pull the struct out of the settings global. Do not remove!
240    pub fn set_authenticated_user_info(
241        self: &Arc<Self>,
242        metrics_id: Option<String>,
243        is_staff: bool,
244        telemetry_settings: TelemetrySettings,
245    ) {
246        if !telemetry_settings.metrics() {
247            return;
248        }
249
250        let this = self.clone();
251        let mut state = self.state.lock();
252        let installation_id = state.installation_id.clone();
253        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
254        state.metrics_id = metrics_id.clone();
255        state.is_staff = Some(is_staff);
256        drop(state);
257
258        if let Some((token, installation_id)) = MIXPANEL_TOKEN.as_ref().zip(installation_id) {
259            self.executor
260                .spawn(
261                    async move {
262                        let json_bytes = serde_json::to_vec(&[MixpanelEngageRequest {
263                            token,
264                            installation_id,
265                            set: json!({
266                                "Staff": is_staff,
267                                "ID": metrics_id,
268                                "App": true
269                            }),
270                        }])?;
271
272                        this.http_client
273                            .post_json(MIXPANEL_ENGAGE_URL, json_bytes.into())
274                            .await?;
275                        anyhow::Ok(())
276                    }
277                    .log_err(),
278                )
279                .detach();
280        }
281    }
282
283    pub fn report_clickhouse_event(
284        self: &Arc<Self>,
285        event: ClickhouseEvent,
286        telemetry_settings: TelemetrySettings,
287    ) {
288        if !telemetry_settings.metrics() {
289            return;
290        }
291
292        let mut state = self.state.lock();
293        let signed_in = state.metrics_id.is_some();
294        state.clickhouse_events_queue.push(ClickhouseEventWrapper {
295            time: SystemTime::now()
296                .duration_since(UNIX_EPOCH)
297                .unwrap()
298                .as_millis(),
299            signed_in,
300            event,
301        });
302
303        if state.installation_id.is_some() {
304            if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
305                drop(state);
306                self.flush_clickhouse_events();
307            } else {
308                let this = self.clone();
309                let executor = self.executor.clone();
310                state.flush_clickhouse_events_task = Some(self.executor.spawn(async move {
311                    executor.timer(DEBOUNCE_INTERVAL).await;
312                    this.flush_clickhouse_events();
313                }));
314            }
315        }
316    }
317
318    pub fn report_mixpanel_event(
319        self: &Arc<Self>,
320        kind: &str,
321        properties: Value,
322        telemetry_settings: TelemetrySettings,
323    ) {
324        if !telemetry_settings.metrics() {
325            return;
326        }
327
328        let mut state = self.state.lock();
329        let event = MixpanelEvent {
330            event: kind.into(),
331            properties: MixpanelEventProperties {
332                token: "",
333                time: SystemTime::now()
334                    .duration_since(UNIX_EPOCH)
335                    .unwrap()
336                    .as_millis(),
337                installation_id: state.installation_id.clone(),
338                insert_id: post_inc(&mut state.next_mixpanel_event_id),
339                event_properties: if let Value::Object(properties) = properties {
340                    Some(properties)
341                } else {
342                    None
343                },
344                os_name: state.os_name,
345                os_version: state.os_version.clone(),
346                release_channel: state.release_channel,
347                app_version: state.app_version.clone(),
348                signed_in: state.metrics_id.is_some(),
349            },
350        };
351        state.mixpanel_events_queue.push(event);
352        if state.installation_id.is_some() {
353            if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
354                drop(state);
355                self.flush_mixpanel_events();
356            } else {
357                let this = self.clone();
358                let executor = self.executor.clone();
359                state.flush_mixpanel_events_task = Some(self.executor.spawn(async move {
360                    executor.timer(DEBOUNCE_INTERVAL).await;
361                    this.flush_mixpanel_events();
362                }));
363            }
364        }
365    }
366
367    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
368        self.state.lock().metrics_id.clone()
369    }
370
371    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
372        self.state.lock().installation_id.clone()
373    }
374
375    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
376        self.state.lock().is_staff
377    }
378
379    fn flush_mixpanel_events(self: &Arc<Self>) {
380        let mut state = self.state.lock();
381        let mut events = mem::take(&mut state.mixpanel_events_queue);
382        state.flush_mixpanel_events_task.take();
383        drop(state);
384
385        if let Some(token) = MIXPANEL_TOKEN.as_ref() {
386            let this = self.clone();
387            self.executor
388                .spawn(
389                    async move {
390                        let mut json_bytes = Vec::new();
391
392                        if let Some(file) = &mut this.state.lock().log_file {
393                            let file = file.as_file_mut();
394                            for event in &mut events {
395                                json_bytes.clear();
396                                serde_json::to_writer(&mut json_bytes, event)?;
397                                file.write_all(&json_bytes)?;
398                                file.write(b"\n")?;
399
400                                event.properties.token = token;
401                            }
402                        }
403
404                        json_bytes.clear();
405                        serde_json::to_writer(&mut json_bytes, &events)?;
406                        this.http_client
407                            .post_json(MIXPANEL_EVENTS_URL, json_bytes.into())
408                            .await?;
409                        anyhow::Ok(())
410                    }
411                    .log_err(),
412                )
413                .detach();
414        }
415    }
416
417    fn flush_clickhouse_events(self: &Arc<Self>) {
418        let mut state = self.state.lock();
419        let mut events = mem::take(&mut state.clickhouse_events_queue);
420        state.flush_clickhouse_events_task.take();
421        drop(state);
422
423        let this = self.clone();
424        self.executor
425            .spawn(
426                async move {
427                    let mut json_bytes = Vec::new();
428
429                    if let Some(file) = &mut this.state.lock().log_file {
430                        let file = file.as_file_mut();
431                        for event in &mut events {
432                            json_bytes.clear();
433                            serde_json::to_writer(&mut json_bytes, event)?;
434                            file.write_all(&json_bytes)?;
435                            file.write(b"\n")?;
436                        }
437                    }
438
439                    {
440                        let state = this.state.lock();
441                        json_bytes.clear();
442                        serde_json::to_writer(
443                            &mut json_bytes,
444                            &ClickhouseEventRequestBody {
445                                token: ZED_SECRET_CLIENT_TOKEN,
446                                installation_id: state.installation_id.clone(),
447                                app_version: state.app_version.clone(),
448                                os_name: state.os_name,
449                                os_version: state.os_version.clone(),
450                                release_channel: state.release_channel,
451                                events,
452                            },
453                        )?;
454                    }
455
456                    this.http_client
457                        .post_json(CLICKHOUSE_EVENTS_URL.as_str(), json_bytes.into())
458                        .await?;
459                    anyhow::Ok(())
460                }
461                .log_err(),
462            )
463            .detach();
464    }
465}