telemetry.rs

  1use crate::{TelemetrySettings, ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL};
  2use db::kvp::KEY_VALUE_STORE;
  3use gpui::{
  4    executor::Background,
  5    serde_json::{self, value::Map, Value},
  6    AppContext, Task,
  7};
  8use lazy_static::lazy_static;
  9use parking_lot::Mutex;
 10use serde::Serialize;
 11use serde_json::json;
 12use std::{
 13    io::Write,
 14    mem,
 15    path::PathBuf,
 16    sync::Arc,
 17    time::{Duration, SystemTime, UNIX_EPOCH},
 18};
 19use tempfile::NamedTempFile;
 20use util::http::HttpClient;
 21use util::{channel::ReleaseChannel, post_inc, ResultExt, TryFutureExt};
 22use uuid::Uuid;
 23
 24pub struct Telemetry {
 25    http_client: Arc<dyn HttpClient>,
 26    executor: Arc<Background>,
 27    state: Mutex<TelemetryState>,
 28}
 29
 30#[derive(Default)]
 31struct TelemetryState {
 32    metrics_id: Option<Arc<str>>,      // Per logged-in user
 33    installation_id: Option<Arc<str>>, // Per app installation
 34    app_version: Option<Arc<str>>,
 35    release_channel: Option<&'static str>,
 36    os_version: Option<Arc<str>>,
 37    os_name: &'static str,
 38    mixpanel_events_queue: Vec<MixpanelEvent>,
 39    clickhouse_events_queue: Vec<ClickhouseEventWrapper>,
 40    next_mixpanel_event_id: usize,
 41    flush_mixpanel_events_task: Option<Task<()>>,
 42    flush_clickhouse_events_task: Option<Task<()>>,
 43    log_file: Option<NamedTempFile>,
 44    is_staff: Option<bool>,
 45}
 46
 47const MIXPANEL_EVENTS_URL: &'static str = "https://api.mixpanel.com/track";
 48const MIXPANEL_ENGAGE_URL: &'static str = "https://api.mixpanel.com/engage#profile-set";
 49const CLICKHOUSE_EVENTS_URL_PATH: &'static str = "/api/events";
 50
 51lazy_static! {
 52    static ref MIXPANEL_TOKEN: Option<String> = std::env::var("ZED_MIXPANEL_TOKEN")
 53        .ok()
 54        .or_else(|| option_env!("ZED_MIXPANEL_TOKEN").map(|key| key.to_string()));
 55    static ref CLICKHOUSE_EVENTS_URL: String =
 56        format!("{}{}", *ZED_SERVER_URL, CLICKHOUSE_EVENTS_URL_PATH);
 57}
 58
 59#[derive(Serialize, Debug)]
 60struct ClickhouseEventRequestBody {
 61    token: &'static str,
 62    installation_id: Option<Arc<str>>,
 63    app_version: Option<Arc<str>>,
 64    os_name: &'static str,
 65    os_version: Option<Arc<str>>,
 66    release_channel: Option<&'static str>,
 67    events: Vec<ClickhouseEventWrapper>,
 68}
 69
 70#[derive(Serialize, Debug)]
 71struct ClickhouseEventWrapper {
 72    time: u128,
 73    signed_in: bool,
 74    #[serde(flatten)]
 75    event: ClickhouseEvent,
 76}
 77
 78#[derive(Serialize, Debug)]
 79#[serde(tag = "type")]
 80pub enum ClickhouseEvent {
 81    Editor {
 82        operation: &'static str,
 83        file_extension: Option<String>,
 84        vim_mode: bool,
 85        copilot_enabled: bool,
 86        copilot_enabled_for_language: bool,
 87    },
 88}
 89
 90#[derive(Serialize, Debug)]
 91struct MixpanelEvent {
 92    event: String,
 93    properties: MixpanelEventProperties,
 94}
 95
 96#[derive(Serialize, Debug)]
 97struct MixpanelEventProperties {
 98    // Mixpanel required fields
 99    #[serde(skip_serializing_if = "str::is_empty")]
100    token: &'static str,
101    time: u128,
102    #[serde(rename = "distinct_id")]
103    installation_id: Option<Arc<str>>,
104    #[serde(rename = "$insert_id")]
105    insert_id: usize,
106    // Custom fields
107    #[serde(skip_serializing_if = "Option::is_none", flatten)]
108    event_properties: Option<Map<String, Value>>,
109    #[serde(rename = "OS Name")]
110    os_name: &'static str,
111    #[serde(rename = "OS Version")]
112    os_version: Option<Arc<str>>,
113    #[serde(rename = "Release Channel")]
114    release_channel: Option<&'static str>,
115    #[serde(rename = "App Version")]
116    app_version: Option<Arc<str>>,
117    #[serde(rename = "Signed In")]
118    signed_in: bool,
119}
120
121#[derive(Serialize)]
122struct MixpanelEngageRequest {
123    #[serde(rename = "$token")]
124    token: &'static str,
125    #[serde(rename = "$distinct_id")]
126    installation_id: Arc<str>,
127    #[serde(rename = "$set")]
128    set: Value,
129}
130
131#[cfg(debug_assertions)]
132const MAX_QUEUE_LEN: usize = 1;
133
134#[cfg(not(debug_assertions))]
135const MAX_QUEUE_LEN: usize = 10;
136
137#[cfg(debug_assertions)]
138const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(1);
139
140#[cfg(not(debug_assertions))]
141const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30);
142
143impl Telemetry {
144    pub fn new(client: Arc<dyn HttpClient>, cx: &AppContext) -> Arc<Self> {
145        let platform = cx.platform();
146        let release_channel = if cx.has_global::<ReleaseChannel>() {
147            Some(cx.global::<ReleaseChannel>().display_name())
148        } else {
149            None
150        };
151        let this = Arc::new(Self {
152            http_client: client,
153            executor: cx.background().clone(),
154            state: Mutex::new(TelemetryState {
155                os_version: platform.os_version().ok().map(|v| v.to_string().into()),
156                os_name: platform.os_name().into(),
157                app_version: platform.app_version().ok().map(|v| v.to_string().into()),
158                release_channel,
159                installation_id: None,
160                metrics_id: None,
161                mixpanel_events_queue: Default::default(),
162                clickhouse_events_queue: Default::default(),
163                flush_mixpanel_events_task: Default::default(),
164                flush_clickhouse_events_task: Default::default(),
165                next_mixpanel_event_id: 0,
166                log_file: None,
167                is_staff: None,
168            }),
169        });
170
171        if MIXPANEL_TOKEN.is_some() {
172            this.executor
173                .spawn({
174                    let this = this.clone();
175                    async move {
176                        if let Some(tempfile) = NamedTempFile::new().log_err() {
177                            this.state.lock().log_file = Some(tempfile);
178                        }
179                    }
180                })
181                .detach();
182        }
183
184        this
185    }
186
187    pub fn log_file_path(&self) -> Option<PathBuf> {
188        Some(self.state.lock().log_file.as_ref()?.path().to_path_buf())
189    }
190
191    pub fn start(self: &Arc<Self>) {
192        let this = self.clone();
193        self.executor
194            .spawn(
195                async move {
196                    let installation_id =
197                        if let Ok(Some(installation_id)) = KEY_VALUE_STORE.read_kvp("device_id") {
198                            installation_id
199                        } else {
200                            let installation_id = Uuid::new_v4().to_string();
201                            KEY_VALUE_STORE
202                                .write_kvp("device_id".to_string(), installation_id.clone())
203                                .await?;
204                            installation_id
205                        };
206
207                    let installation_id: Arc<str> = installation_id.into();
208                    let mut state = this.state.lock();
209                    state.installation_id = Some(installation_id.clone());
210
211                    for event in &mut state.mixpanel_events_queue {
212                        event
213                            .properties
214                            .installation_id
215                            .get_or_insert_with(|| installation_id.clone());
216                    }
217
218                    let has_mixpanel_events = !state.mixpanel_events_queue.is_empty();
219                    let has_clickhouse_events = !state.clickhouse_events_queue.is_empty();
220                    drop(state);
221
222                    if has_mixpanel_events {
223                        this.flush_mixpanel_events();
224                    }
225
226                    if has_clickhouse_events {
227                        this.flush_clickhouse_events();
228                    }
229
230                    anyhow::Ok(())
231                }
232                .log_err(),
233            )
234            .detach();
235    }
236
237    /// This method takes the entire TelemetrySettings struct in order to force client code
238    /// to pull the struct out of the settings global. Do not remove!
239    pub fn set_authenticated_user_info(
240        self: &Arc<Self>,
241        metrics_id: Option<String>,
242        is_staff: bool,
243        cx: &AppContext,
244    ) {
245        if !settings::get_setting::<TelemetrySettings>(None, cx).metrics {
246            return;
247        }
248
249        let this = self.clone();
250        let mut state = self.state.lock();
251        let installation_id = state.installation_id.clone();
252        let metrics_id: Option<Arc<str>> = metrics_id.map(|id| id.into());
253        state.metrics_id = metrics_id.clone();
254        state.is_staff = Some(is_staff);
255        drop(state);
256
257        if let Some((token, installation_id)) = MIXPANEL_TOKEN.as_ref().zip(installation_id) {
258            self.executor
259                .spawn(
260                    async move {
261                        let json_bytes = serde_json::to_vec(&[MixpanelEngageRequest {
262                            token,
263                            installation_id,
264                            set: json!({
265                                "Staff": is_staff,
266                                "ID": metrics_id,
267                                "App": true
268                            }),
269                        }])?;
270
271                        this.http_client
272                            .post_json(MIXPANEL_ENGAGE_URL, json_bytes.into())
273                            .await?;
274                        anyhow::Ok(())
275                    }
276                    .log_err(),
277                )
278                .detach();
279        }
280    }
281
282    pub fn report_clickhouse_event(
283        self: &Arc<Self>,
284        event: ClickhouseEvent,
285        telemetry_settings: TelemetrySettings,
286    ) {
287        if !telemetry_settings.metrics {
288            return;
289        }
290
291        let mut state = self.state.lock();
292        let signed_in = state.metrics_id.is_some();
293        state.clickhouse_events_queue.push(ClickhouseEventWrapper {
294            time: SystemTime::now()
295                .duration_since(UNIX_EPOCH)
296                .unwrap()
297                .as_millis(),
298            signed_in,
299            event,
300        });
301
302        if state.installation_id.is_some() {
303            if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
304                drop(state);
305                self.flush_clickhouse_events();
306            } else {
307                let this = self.clone();
308                let executor = self.executor.clone();
309                state.flush_clickhouse_events_task = Some(self.executor.spawn(async move {
310                    executor.timer(DEBOUNCE_INTERVAL).await;
311                    this.flush_clickhouse_events();
312                }));
313            }
314        }
315    }
316
317    pub fn report_mixpanel_event(
318        self: &Arc<Self>,
319        kind: &str,
320        properties: Value,
321        telemetry_settings: TelemetrySettings,
322    ) {
323        if !telemetry_settings.metrics {
324            return;
325        }
326
327        let mut state = self.state.lock();
328        let event = MixpanelEvent {
329            event: kind.into(),
330            properties: MixpanelEventProperties {
331                token: "",
332                time: SystemTime::now()
333                    .duration_since(UNIX_EPOCH)
334                    .unwrap()
335                    .as_millis(),
336                installation_id: state.installation_id.clone(),
337                insert_id: post_inc(&mut state.next_mixpanel_event_id),
338                event_properties: if let Value::Object(properties) = properties {
339                    Some(properties)
340                } else {
341                    None
342                },
343                os_name: state.os_name,
344                os_version: state.os_version.clone(),
345                release_channel: state.release_channel,
346                app_version: state.app_version.clone(),
347                signed_in: state.metrics_id.is_some(),
348            },
349        };
350        state.mixpanel_events_queue.push(event);
351        if state.installation_id.is_some() {
352            if state.mixpanel_events_queue.len() >= MAX_QUEUE_LEN {
353                drop(state);
354                self.flush_mixpanel_events();
355            } else {
356                let this = self.clone();
357                let executor = self.executor.clone();
358                state.flush_mixpanel_events_task = Some(self.executor.spawn(async move {
359                    executor.timer(DEBOUNCE_INTERVAL).await;
360                    this.flush_mixpanel_events();
361                }));
362            }
363        }
364    }
365
366    pub fn metrics_id(self: &Arc<Self>) -> Option<Arc<str>> {
367        self.state.lock().metrics_id.clone()
368    }
369
370    pub fn installation_id(self: &Arc<Self>) -> Option<Arc<str>> {
371        self.state.lock().installation_id.clone()
372    }
373
374    pub fn is_staff(self: &Arc<Self>) -> Option<bool> {
375        self.state.lock().is_staff
376    }
377
378    fn flush_mixpanel_events(self: &Arc<Self>) {
379        let mut state = self.state.lock();
380        let mut events = mem::take(&mut state.mixpanel_events_queue);
381        state.flush_mixpanel_events_task.take();
382        drop(state);
383
384        if let Some(token) = MIXPANEL_TOKEN.as_ref() {
385            let this = self.clone();
386            self.executor
387                .spawn(
388                    async move {
389                        let mut json_bytes = Vec::new();
390
391                        if let Some(file) = &mut this.state.lock().log_file {
392                            let file = file.as_file_mut();
393                            for event in &mut events {
394                                json_bytes.clear();
395                                serde_json::to_writer(&mut json_bytes, event)?;
396                                file.write_all(&json_bytes)?;
397                                file.write(b"\n")?;
398
399                                event.properties.token = token;
400                            }
401                        }
402
403                        json_bytes.clear();
404                        serde_json::to_writer(&mut json_bytes, &events)?;
405                        this.http_client
406                            .post_json(MIXPANEL_EVENTS_URL, json_bytes.into())
407                            .await?;
408                        anyhow::Ok(())
409                    }
410                    .log_err(),
411                )
412                .detach();
413        }
414    }
415
416    fn flush_clickhouse_events(self: &Arc<Self>) {
417        let mut state = self.state.lock();
418        let mut events = mem::take(&mut state.clickhouse_events_queue);
419        state.flush_clickhouse_events_task.take();
420        drop(state);
421
422        let this = self.clone();
423        self.executor
424            .spawn(
425                async move {
426                    let mut json_bytes = Vec::new();
427
428                    if let Some(file) = &mut this.state.lock().log_file {
429                        let file = file.as_file_mut();
430                        for event in &mut events {
431                            json_bytes.clear();
432                            serde_json::to_writer(&mut json_bytes, event)?;
433                            file.write_all(&json_bytes)?;
434                            file.write(b"\n")?;
435                        }
436                    }
437
438                    {
439                        let state = this.state.lock();
440                        json_bytes.clear();
441                        serde_json::to_writer(
442                            &mut json_bytes,
443                            &ClickhouseEventRequestBody {
444                                token: ZED_SECRET_CLIENT_TOKEN,
445                                installation_id: state.installation_id.clone(),
446                                app_version: state.app_version.clone(),
447                                os_name: state.os_name,
448                                os_version: state.os_version.clone(),
449                                release_channel: state.release_channel,
450                                events,
451                            },
452                        )?;
453                    }
454
455                    this.http_client
456                        .post_json(CLICKHOUSE_EVENTS_URL.as_str(), json_bytes.into())
457                        .await?;
458                    anyhow::Ok(())
459                }
460                .log_err(),
461            )
462            .detach();
463    }
464}